From 61947765a70534f036d6618df991cc6979d3b045 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Fri, 26 Aug 2022 17:25:21 +0100 Subject: [PATCH 1/8] Add row format --- arrow/Cargo.toml | 5 + arrow/benches/row_format.rs | 88 ++++ arrow/src/compute/kernels/sort.rs | 6 +- arrow/src/lib.rs | 1 + arrow/src/row/fixed.rs | 146 ++++++ arrow/src/row/interner.rs | 314 +++++++++++++ arrow/src/row/mod.rs | 749 ++++++++++++++++++++++++++++++ arrow/src/row/variable.rs | 97 ++++ 8 files changed, 1403 insertions(+), 3 deletions(-) create mode 100644 arrow/benches/row_format.rs create mode 100644 arrow/src/row/fixed.rs create mode 100644 arrow/src/row/interner.rs create mode 100644 arrow/src/row/mod.rs create mode 100644 arrow/src/row/variable.rs diff --git a/arrow/Cargo.toml b/arrow/Cargo.toml index cedd48e4d31..1b2bb6fd775 100644 --- a/arrow/Cargo.toml +++ b/arrow/Cargo.toml @@ -233,3 +233,8 @@ harness = false [[bench]] name = "decimal_validate" harness = false + +[[bench]] +name = "row_format" +harness = false +required-features = ["test_utils"] diff --git a/arrow/benches/row_format.rs b/arrow/benches/row_format.rs new file mode 100644 index 00000000000..5f554cc5737 --- /dev/null +++ b/arrow/benches/row_format.rs @@ -0,0 +1,88 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#[macro_use] +extern crate criterion; +extern crate core; + +use arrow::array::ArrayRef; +use arrow::datatypes::{Int64Type, UInt64Type}; +use arrow::row::RowConverter; +use arrow::util::bench_util::{create_primitive_array, create_string_array_with_len}; +use criterion::{black_box, Criterion}; +use std::sync::Arc; + +fn row_bench(c: &mut Criterion) { + let cols = vec![Arc::new(create_primitive_array::(4096, 0.)) as ArrayRef]; + + c.bench_function("row_batch 4096 u64(0)", |b| { + b.iter(|| black_box(RowConverter::new(vec![Default::default()]).convert(&cols))); + }); + + let cols = vec![Arc::new(create_primitive_array::(4096, 0.)) as ArrayRef]; + + c.bench_function("row_batch 4096 i64(0)", |b| { + b.iter(|| black_box(RowConverter::new(vec![Default::default()]).convert(&cols))); + }); + + let cols = + vec![Arc::new(create_string_array_with_len::(4096, 0., 10)) as ArrayRef]; + + c.bench_function("row_batch 4096 string(10, 0)", |b| { + b.iter(|| black_box(RowConverter::new(vec![Default::default()]).convert(&cols))); + }); + + let cols = + vec![Arc::new(create_string_array_with_len::(4096, 0., 30)) as ArrayRef]; + + c.bench_function("row_batch 4096 string(30, 0)", |b| { + b.iter(|| black_box(RowConverter::new(vec![Default::default()]).convert(&cols))); + }); + + let cols = + vec![Arc::new(create_string_array_with_len::(4096, 0., 100)) as ArrayRef]; + + c.bench_function("row_batch 4096 string(100, 0)", |b| { + b.iter(|| black_box(RowConverter::new(vec![Default::default()]).convert(&cols))); + }); + + let cols = + vec![Arc::new(create_string_array_with_len::(4096, 0.5, 100)) as ArrayRef]; + + c.bench_function("row_batch 4096 string(100, 0.5)", |b| { + b.iter(|| black_box(RowConverter::new(vec![Default::default()]).convert(&cols))); + }); + + let cols = [ + Arc::new(create_string_array_with_len::(4096, 0.5, 20)) as ArrayRef, + Arc::new(create_string_array_with_len::(4096, 0., 30)) as ArrayRef, + Arc::new(create_string_array_with_len::(4096, 0., 100)) as ArrayRef, + Arc::new(create_primitive_array::(4096, 0.)) as ArrayRef, + ]; + + c.bench_function( + "row_batch 4096 string(20, 0.5), string(30, 0), string(100, 0), i64(0)", + |b| { + b.iter(|| { + black_box(RowConverter::new(vec![Default::default()]).convert(&cols)) + }); + }, + ); +} + +criterion_group!(benches, row_bench); +criterion_main!(benches); diff --git a/arrow/src/compute/kernels/sort.rs b/arrow/src/compute/kernels/sort.rs index 0e2273e9252..bd73aede1d1 100644 --- a/arrow/src/compute/kernels/sort.rs +++ b/arrow/src/compute/kernels/sort.rs @@ -941,13 +941,13 @@ type LexicographicalCompareItem<'a> = ( /// A lexicographical comparator that wraps given array data (columns) and can lexicographically compare data /// at given two indices. The lifetime is the same at the data wrapped. -pub(super) struct LexicographicalComparator<'a> { +pub(crate) struct LexicographicalComparator<'a> { compare_items: Vec>, } impl LexicographicalComparator<'_> { /// lexicographically compare values at the wrapped columns with given indices. - pub(super) fn compare<'a, 'b>( + pub(crate) fn compare<'a, 'b>( &'a self, a_idx: &'b usize, b_idx: &'b usize, @@ -991,7 +991,7 @@ impl LexicographicalComparator<'_> { /// Create a new lex comparator that will wrap the given sort columns and give comparison /// results with two indices. - pub(super) fn try_new( + pub(crate) fn try_new( columns: &[SortColumn], ) -> Result> { let compare_items = columns diff --git a/arrow/src/lib.rs b/arrow/src/lib.rs index d1fb0cae0da..87a4799e3e2 100644 --- a/arrow/src/lib.rs +++ b/arrow/src/lib.rs @@ -269,6 +269,7 @@ pub mod json; #[cfg(feature = "pyarrow")] pub mod pyarrow; pub mod record_batch; +pub mod row; pub mod temporal_conversions; pub mod tensor; pub mod util; diff --git a/arrow/src/row/fixed.rs b/arrow/src/row/fixed.rs new file mode 100644 index 00000000000..41444d769bf --- /dev/null +++ b/arrow/src/row/fixed.rs @@ -0,0 +1,146 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::array::PrimitiveArray; +use crate::compute::SortOptions; +use crate::datatypes::ArrowPrimitiveType; +use crate::row::Rows; +use crate::util::decimal::{Decimal128, Decimal256}; +use half::f16; + +pub trait FixedLengthEncoding: Copy { + fn encode(self) -> [u8; N]; +} + +impl FixedLengthEncoding<1> for bool { + fn encode(self) -> [u8; 1] { + [self as u8] + } +} + +macro_rules! encode_signed { + ($n:expr, $t:ty) => { + impl FixedLengthEncoding<$n> for $t { + fn encode(self) -> [u8; $n] { + let mut b = self.to_be_bytes(); + // Toggle top bit + b[0] ^= 0x80; + b + } + } + }; +} + +encode_signed!(1, i8); +encode_signed!(2, i16); +encode_signed!(4, i32); +encode_signed!(8, i64); +encode_signed!(16, i128); + +macro_rules! encode_unsigned { + ($n:expr, $t:ty) => { + impl FixedLengthEncoding<$n> for $t { + fn encode(self) -> [u8; $n] { + self.to_be_bytes() + } + } + }; +} + +encode_unsigned!(1, u8); +encode_unsigned!(2, u16); +encode_unsigned!(4, u32); +encode_unsigned!(8, u64); + +impl FixedLengthEncoding<2> for f16 { + fn encode(self) -> [u8; 2] { + // https://github.com/rust-lang/rust/blob/9c20b2a8cc7588decb6de25ac6a7912dcef24d65/library/core/src/num/f32.rs#L1176-L1260 + let s = self.to_bits() as i16; + let val = s ^ (((s >> 15) as u16) >> 1) as i16; + val.encode() + } +} + +impl FixedLengthEncoding<4> for f32 { + fn encode(self) -> [u8; 4] { + // https://github.com/rust-lang/rust/blob/9c20b2a8cc7588decb6de25ac6a7912dcef24d65/library/core/src/num/f32.rs#L1176-L1260 + let s = self.to_bits() as i32; + let val = s ^ (((s >> 31) as u32) >> 1) as i32; + val.encode() + } +} + +impl FixedLengthEncoding<8> for f64 { + fn encode(self) -> [u8; 8] { + // https://github.com/rust-lang/rust/blob/9c20b2a8cc7588decb6de25ac6a7912dcef24d65/library/core/src/num/f32.rs#L1176-L1260 + let s = self.to_bits() as i64; + let val = s ^ (((s >> 63) as u64) >> 1) as i64; + val.encode() + } +} + +impl FixedLengthEncoding<16> for Decimal128 { + fn encode(self) -> [u8; 16] { + let mut val = *self.raw_value(); + val.reverse(); + val[0] ^= 0x80; + val + } +} + +impl FixedLengthEncoding<32> for Decimal256 { + fn encode(self) -> [u8; 32] { + let mut val = *self.raw_value(); + val.reverse(); + val[0] ^= 0x80; + val + } +} + +pub const fn encoded_len(_col: &PrimitiveArray) -> usize { + std::mem::size_of::() + 1 +} + +/// Fixed width types are encoded as +/// +/// - 1 byte `0` if null or `1` if valid +/// - bytes of [`FixedLengthEncoding`] +pub fn encode< + const N: usize, + T: FixedLengthEncoding, + I: IntoIterator>, +>( + out: &mut Rows, + i: I, + opts: SortOptions, +) { + for (offset, maybe_val) in out.offsets.iter_mut().skip(1).zip(i) { + let end_offset = *offset + N + 1; + if let Some(val) = maybe_val { + let to_write = &mut out.buffer[*offset..end_offset]; + to_write[0] = 1; + let mut encoded = val.encode(); + if opts.descending { + encoded.iter_mut().for_each(|v| *v = !*v) + } + to_write[1..].copy_from_slice(&encoded) + } else if !opts.nulls_first { + out.buffer[*offset] = 0xFF; + } + *offset = end_offset; + } +} diff --git a/arrow/src/row/interner.rs b/arrow/src/row/interner.rs new file mode 100644 index 00000000000..e139d261f7a --- /dev/null +++ b/arrow/src/row/interner.rs @@ -0,0 +1,314 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use hashbrown::hash_map::RawEntryMut; +use hashbrown::HashMap; +use std::cmp::Ordering; +use std::ops::Index; + +#[derive(Debug, Copy, Clone, PartialEq, Eq)] +pub struct Interned(u32); + +/// A byte array interner that generates normalized keys that are sorted with respect +/// to the interned values, e.g. `inter(a) < intern(b) => a < b` +#[derive(Debug, Default)] +pub struct OrderPreservingInterner { + keys: InternBuffer, + values: InternBuffer, + bucket: Box, + + hasher: ahash::RandomState, + lookup: HashMap, +} + +impl OrderPreservingInterner { + /// Interns an iterator of values returning a list of [`Interned`] which can be + /// used with [`Self::normalized_key`] to retrieve the normalized keys with a + /// lifetime not tied to the mutable borrow passed to this method + pub fn intern(&mut self, input: I) -> Vec + where + I: IntoIterator, + V: AsRef<[u8]>, + { + let iter = input.into_iter(); + let capacity = iter.size_hint().0; + let mut out = Vec::with_capacity(capacity); + let mut to_intern: Vec<(usize, u64, V)> = Vec::with_capacity(capacity); + let mut to_intern_len = 0; + + for (idx, value) in iter.enumerate() { + let v = value.as_ref(); + let hash = self.hasher.hash_one(v); + let entry = self + .lookup + .raw_entry_mut() + .from_hash(hash, |a| &self.values[*a] == v); + + match entry { + RawEntryMut::Occupied(o) => out.push(*o.key()), + RawEntryMut::Vacant(_) => { + // Push placeholder + out.push(Interned(0)); + to_intern_len += v.len(); + to_intern.push((idx, hash, value)); + } + }; + } + + to_intern.sort_unstable_by(|(_, _, a), (_, _, b)| a.as_ref().cmp(b.as_ref())); + + self.keys.offsets.reserve(to_intern.len()); + self.keys.values.reserve(to_intern.len()); // Approximation + self.values.offsets.reserve(to_intern.len()); + self.values.values.reserve(to_intern_len); + + for (idx, hash, value) in to_intern { + let val = value.as_ref(); + + let entry = self + .lookup + .raw_entry_mut() + .from_hash(hash, |a| &self.values[*a] == val); + + match entry { + RawEntryMut::Occupied(o) => { + out[idx] = *o.key(); + } + RawEntryMut::Vacant(v) => { + let val = value.as_ref(); + self.bucket + .insert(&mut self.values, val, &mut self.keys.values); + self.keys.values.push(0); + let interned = self.keys.append(); + + let hasher = &mut self.hasher; + let values = &self.values; + v.insert_with_hasher(hash, interned, (), |key| { + hasher.hash_one(&values[*key]) + }); + out[idx] = interned; + } + } + } + + out + } + + /// Returns a null-terminated byte array that can be compared against other normalized_key + /// returned by this instance, to establish ordering of the interned values + pub fn normalized_key(&self, key: Interned) -> &[u8] { + &self.keys[key] + } +} + +/// A buffer of `[u8]` indexed by `[Interned]` +#[derive(Debug)] +struct InternBuffer { + values: Vec, + offsets: Vec, +} + +impl Default for InternBuffer { + fn default() -> Self { + Self { + values: Default::default(), + offsets: vec![0], + } + } +} + +impl InternBuffer { + /// Insert `data` returning the corresponding [`Interned`] + fn insert(&mut self, data: &[u8]) -> Interned { + self.values.extend_from_slice(data); + self.append() + } + + /// Appends the next value based on data written to `self.values` + /// returning the corresponding [`Interned`] + fn append(&mut self) -> Interned { + let idx = self.offsets.len() - 1; + let key = Interned(idx.try_into().unwrap()); + self.offsets.push(self.values.len()); + key + } +} + +impl Index for InternBuffer { + type Output = [u8]; + + fn index(&self, key: Interned) -> &Self::Output { + let index = key.0 as usize; + let end = self.offsets[index + 1]; + let start = self.offsets[index]; + unsafe { self.values.get_unchecked(start..end) } + } +} + +/// A slot corresponds to a single byte-value in the generated normalized key +/// +/// It may contain a value, if not the first slot, and may contain a child [`Bucket`] representing +/// the next byte in the generated normalized key +#[derive(Debug, Default, Clone)] +struct Slot { + value: Option, + /// Child values smaller than `self.value` if any + child: Option>, +} + +/// Each bucket corresponds to a single byte in the normalized key +#[derive(Debug, Clone)] +struct Bucket { + slots: Box<[Slot]>, +} + +impl Default for Bucket { + fn default() -> Self { + let slots = (0..255).map(|_| Slot::default()).collect::>().into(); + Self { slots } + } +} + +impl Bucket { + /// Perform a skewed binary search to find the first slot that is empty or less + /// + /// Returns `Ok(idx)` if an exact match is found, otherwise returns `Err(idx)` + /// containing the slot index to insert at + fn insert_pos(&self, values_buf: &InternBuffer, data: &[u8]) -> Result { + let mut size = self.slots.len() - 1; + let mut left = 0; + let mut right = size; + while left < right { + // Skew binary search to leave gaps of at most 3 elements + let mid = left + (size / 2).min(3); + + let slot = &self.slots[mid]; + let val = match slot.value { + Some(val) => val, + None => return Err(mid), + }; + + let cmp = values_buf[val].cmp(data); + if cmp == Ordering::Less { + left = mid + 1; + } else if cmp == Ordering::Greater { + right = mid; + } else { + return Ok(mid); + } + + size = right - left; + } + Err(left) + } + + /// Insert `data` into this bucket or one of its children, appending the + /// normalized key to `out` as it is constructed + /// + /// # Panics + /// + /// Panics if the value already exists + fn insert(&mut self, values_buf: &mut InternBuffer, data: &[u8], out: &mut Vec) { + match self.insert_pos(values_buf, data) { + Ok(_) => unreachable!("value already exists"), + Err(idx) => { + let slot = &mut self.slots[idx]; + // Must always spill final slot so can always create a value less than + if idx != 254 && slot.value.is_none() { + out.push(idx as u8 + 2); + slot.value = Some(values_buf.insert(data)) + } else { + out.push(idx as u8 + 1); + slot.child + .get_or_insert_with(Default::default) + .insert(values_buf, data, out); + } + } + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use rand::prelude::*; + + // Clippy isn't smart enough to understand dropping mutability + #[allow(clippy::needless_collect)] + fn test_intern_values(values: &[u64]) { + let mut interner = OrderPreservingInterner::default(); + + // Intern a single value at a time to check ordering + let interned: Vec<_> = values + .iter() + .flat_map(|v| interner.intern([&v.to_be_bytes()])) + .collect(); + + let interned: Vec<_> = interned + .into_iter() + .map(|x| interner.normalized_key(x)) + .collect(); + + for (i, a) in interned.iter().enumerate() { + for (j, b) in interned.iter().enumerate() { + let interned_cmp = a.cmp(b); + let values_cmp = values[i].cmp(&values[j]); + assert_eq!( + interned_cmp, values_cmp, + "({:?} vs {:?}) vs ({} vs {})", + a, b, values[i], values[j] + ) + } + } + } + + #[test] + fn test_interner() { + test_intern_values(&[8, 6, 5, 7]); + + let mut values: Vec<_> = (0_u64..2000).collect(); + test_intern_values(&values); + + let mut rng = thread_rng(); + values.shuffle(&mut rng); + test_intern_values(&values); + } + + #[test] + fn test_intern_duplicates() { + // Unsorted with duplicates + let values = vec![0_u8, 1, 8, 4, 1, 0]; + let mut interner = OrderPreservingInterner::default(); + + let interned = interner.intern(values.iter().map(std::slice::from_ref)); + + assert_eq!(interned[0], interned[5]); + assert_eq!(interned[1], interned[4]); + assert!( + interner.normalized_key(interned[0]) < interner.normalized_key(interned[1]) + ); + assert!( + interner.normalized_key(interned[1]) < interner.normalized_key(interned[2]) + ); + assert!( + interner.normalized_key(interned[1]) < interner.normalized_key(interned[3]) + ); + assert!( + interner.normalized_key(interned[3]) < interner.normalized_key(interned[2]) + ); + } +} diff --git a/arrow/src/row/mod.rs b/arrow/src/row/mod.rs new file mode 100644 index 00000000000..0b1f1f7c83e --- /dev/null +++ b/arrow/src/row/mod.rs @@ -0,0 +1,749 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! A comparable row-oriented representation of a collection of [`Array`] + +use crate::array::{ + as_boolean_array, as_generic_binary_array, as_largestring_array, as_string_array, + Array, ArrayRef, Decimal128Array, Decimal256Array, +}; +use crate::compute::SortOptions; +use crate::datatypes::*; +use crate::row::interner::{Interned, OrderPreservingInterner}; +use crate::{downcast_dictionary_array, downcast_primitive_array}; + +mod fixed; +mod interner; +mod variable; + +/// Converts arrays into a row-oriented format that are [normalized for sorting]. +/// +/// In particular, a byte-wise comparison of the rows, e.g. [`memcmp`], is sufficient +/// to establish the ordering of two rows, allowing for extremely fast comparisons, +/// and permitting the use of [non-comparison sorts] such as [radix sort] +/// +/// Comparing [`Rows`] generated by different [`RowConverter`] is not guaranteed to +/// yield a meaningful ordering +/// +/// # Format +/// +/// The encoding of the row format should not be considered stable, but is documented here +/// for reference. +/// +/// ## Unsigned Integer Encoding +/// +/// A null integer is encoded as a `0_u8`, followed by a zero-ed number of bytes corresponding +/// to the integer's length +/// +/// A valid integer is encoded as `1_u8`, followed by the big-endian representation of the +/// integer +/// +/// ## Signed Integer Encoding +/// +/// Signed integers have their most significant sign bit flipped, and are then encoded in the +/// same manner as an unsigned integer +/// +/// ## Float Encoding +/// +/// Floats are converted from IEEE 754 representation to a signed integer representation +/// by flipping all bar the sign bit if they are negative. +/// +/// They are then encoded in the same manner as a signed integer +/// +/// ## Variable Length Bytes Encoding +/// +/// A null is encoded as a `0_u8` +/// +/// An empty byte array is encoded as `1_u8` +/// +/// A non-null, non-empty byte array is encoded as `2_u8` followed by the byte array +/// encoded using a block based scheme described below. +/// +/// The byte array is broken up into 32-byte blocks, each block is written in turn +/// to the output, followed by `0xFF_u8`. The final block is padded to 32-bytes +/// with `0_u8` and written to the output, followed by the un-padded length in bytes +/// of this final block as a `u8` +/// +/// This is loosely inspired by [COBS] encoding, and chosen over more traditional +/// [byte stuffing] as it is more amenable to vectorisation, in particular AVX-256. +/// +/// ## Dictionary Encoding +/// +/// [`RowConverter`] needs to support converting dictionary encoded arrays with unsorted, and +/// potentially distinct dictionaries. One simple mechanism to avoid this would be to reverse +/// the dictionary encoding, and encode the array values directly, however, this would lose +/// the benefits of dictionary encoding to reduce memory and CPU consumption. +/// +/// As such the [`RowConverter`] maintains an order-preserving dictionary encoding for each +/// dictionary encoded column. As this is a variable-length encoding, new dictionary values +/// can be added whilst preserving the sort order. +/// +/// A null dictionary value is encoded as `0_u8`. +/// +/// A non-null dictionary value is encoded as `1_u8` followed by a null-terminated byte array +/// key determined by the order-preserving dictionary encoding +/// +/// # Ordering +/// +/// ## Float Ordering +/// +/// Floats are totally ordered in accordance to the `totalOrder` predicate as defined +/// in the IEEE 754 (2008 revision) floating point standard. +/// +/// The ordering established by this does not always agree with the +/// [`PartialOrd`] and [`PartialEq`] implementations of `f32`. For example, +/// they consider negative and positive zero equal, while this does not +/// +/// ## Null Ordering +/// +/// The encoding described above will order nulls first, this can be inverted by representing +/// nulls as `0xFF_u8` instead of `0_u8` +/// +/// ## Reverse Column Ordering +/// +/// The order of a given column can be reversed by negating the encoded bytes of non-null values +/// +/// ## Reconstruction +/// +/// Given a schema it would theoretically be possible to reconstruct the columnar data from +/// the row format, however, this is currently not supported. It is recommended that the row +/// format is instead used to obtain a sorted list of row indices, which can then be used +/// with [`take`](crate::compute::take) to obtain a sorted [`Array`] +/// +/// [non-comparison sorts]:[https://en.wikipedia.org/wiki/Sorting_algorithm#Non-comparison_sorts] +/// [radix sort]:[https://en.wikipedia.org/wiki/Radix_sort] +/// [normalized for sorting]:[https://citeseerx.ist.psu.edu/viewdoc/download?doi=10.1.1.83.1080&rep=rep1&type=pdf] +/// [`memcmp`]:[https://www.man7.org/linux/man-pages/man3/memcmp.3.html] +/// [COBS]:[https://en.wikipedia.org/wiki/Consistent_Overhead_Byte_Stuffing] +/// [byte stuffing]:[https://en.wikipedia.org/wiki/High-Level_Data_Link_Control#Asynchronous_framing] +#[derive(Debug)] +pub struct RowConverter { + options: Vec, + dictionaries: Vec>>, +} + +impl RowConverter { + /// Create a new [`RowConverter`] with the following schema and options + pub fn new(options: Vec) -> Self { + let dictionaries = (0..options.len()).map(|_| None).collect(); + Self { + dictionaries, + options, + } + } + + /// Convert `cols` into [`Rows`] + /// + /// # Panics + /// + /// Panics if the schema of `cols` does not match that provided to [`RowConverter::new`] + pub fn convert(&mut self, arrays: &[ArrayRef]) -> Rows { + assert_eq!(arrays.len(), self.options.len(), "column count mismatch"); + + let dictionaries: Vec<_> = arrays + .iter() + .zip(&mut self.dictionaries) + .map(|(array, dictionary)| { + let values = downcast_dictionary_array! { + array => array.values(), + _ => return None + }; + + let interner = dictionary.get_or_insert_with(Default::default); + + let mapping: Vec<_> = compute_dictionary_mapping(interner, values) + .into_iter() + .map(|interned| interner.normalized_key(interned)) + .collect(); + + Some(mapping) + }) + .collect(); + + let mut rows = new_empty_rows(arrays, &dictionaries); + + for ((array, options), dictionary) in + arrays.iter().zip(&self.options).zip(dictionaries) + { + // We encode a column at a time to minimise dispatch overheads + encode_column(&mut rows, array, *options, dictionary.as_deref()) + } + + if cfg!(debug_assertions) { + assert_eq!(*rows.offsets.last().unwrap(), rows.buffer.len()); + rows.offsets + .windows(2) + .for_each(|w| assert!(w[0] < w[1], "offsets should be monotonic")); + } + + rows + } +} + +/// A row-oriented representation of arrow data, that is normalized for comparison +/// +/// See [`RowConverter`] +#[derive(Debug)] +pub struct Rows { + buffer: Box<[u8]>, + offsets: Box<[usize]>, +} + +impl Rows { + pub fn row(&self, row: usize) -> Row<'_> { + let end = self.offsets[row + 1]; + let start = self.offsets[row]; + Row(&self.buffer[start..end]) + } + + pub fn num_rows(&self) -> usize { + self.offsets.len() - 1 + } +} + +/// A comparable representation of a row, see [`Rows`] +#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)] +pub struct Row<'a>(&'a [u8]); + +impl<'a> AsRef<[u8]> for Row<'a> { + fn as_ref(&self) -> &[u8] { + self.0 + } +} + +/// Computes the dictionary mapping for the given dictionary values +fn compute_dictionary_mapping( + interner: &mut OrderPreservingInterner, + values: &ArrayRef, +) -> Vec { + use fixed::FixedLengthEncoding; + downcast_primitive_array! { + values => interner + .intern(values.iter().map(|x| x.map(|x| x.encode()) + .unwrap_or_default())), + DataType::Binary => { + let iter = as_generic_binary_array::(values).iter().map(|x| x.unwrap_or_default()); + interner.intern(iter) + } + DataType::LargeBinary => { + let iter = as_generic_binary_array::(values).iter().map(|x| x.unwrap_or_default()); + interner.intern(iter) + } + DataType::Utf8 => { + let iter = as_string_array(values).iter().map(|x| x.unwrap_or_default().as_bytes()); + interner.intern(iter) + } + DataType::LargeUtf8 => { + let iter = as_largestring_array(values).iter().map(|x| x.unwrap_or_default().as_bytes()); + interner.intern(iter) + } + t => unreachable!("dictionary value {} is not supported", t) + } +} + +/// Computes the length of each encoded [`Rows`] and returns an empty [`Rows`] +fn new_empty_rows(cols: &[ArrayRef], dictionaries: &[Option>]) -> Rows { + let num_rows = cols.first().map(|x| x.len()).unwrap_or(0); + let mut lengths = vec![0; num_rows]; + + for (array, dict) in cols.iter().zip(dictionaries) { + downcast_primitive_array! { + array => lengths.iter_mut().for_each(|x| *x += fixed::encoded_len(array)), + DataType::Null => lengths.iter_mut().for_each(|x| *x += 1), + DataType::Boolean => lengths.iter_mut().for_each(|x| *x += 2), + DataType::Decimal128(_, _) => lengths.iter_mut().for_each(|x| *x += 17), + DataType::Decimal256(_, _) => lengths.iter_mut().for_each(|x| *x += 33), + DataType::Binary => as_generic_binary_array::(array) + .iter() + .zip(lengths.iter_mut()) + .for_each(|(slice, length)| *length += variable::encoded_len(slice)), + DataType::LargeBinary => as_generic_binary_array::(array) + .iter() + .zip(lengths.iter_mut()) + .for_each(|(slice, length)| *length += variable::encoded_len(slice)), + DataType::Utf8 => as_string_array(array) + .iter() + .zip(lengths.iter_mut()) + .for_each(|(slice, length)| { + *length += variable::encoded_len(slice.map(|x| x.as_bytes())) + }), + DataType::LargeUtf8 => as_largestring_array(array) + .iter() + .zip(lengths.iter_mut()) + .for_each(|(slice, length)| { + *length += variable::encoded_len(slice.map(|x| x.as_bytes())) + }), + DataType::Dictionary(_, _) => downcast_dictionary_array! { + array => { + let dict = dict.as_ref().unwrap(); + for (v, length) in array.keys().iter().zip(lengths.iter_mut()) { + match v { + Some(v) => *length += dict[v as usize].len() + 1, + None => *length += 1, + } + } + } + _ => unreachable!(), + } + t => unimplemented!("not yet implemented: {}", t) + } + } + + let mut offsets = Vec::with_capacity(num_rows + 1); + offsets.push(0); + + let mut cur_offset = 0_usize; + for l in lengths { + offsets.push(cur_offset); + cur_offset = cur_offset.checked_add(l).expect("overflow"); + } + + let buffer = vec![0_u8; cur_offset]; + + Rows { + buffer: buffer.into(), + offsets: offsets.into(), + } +} + +/// Encodes a column to the provided [`Rows`] incrementing the offsets as it progresses +fn encode_column( + out: &mut Rows, + array: &ArrayRef, + opts: SortOptions, + dictionary: Option<&[&[u8]]>, +) { + downcast_primitive_array! { + array => fixed::encode(out, array, opts), + DataType::Null => { + fixed::encode(out, std::iter::repeat(None::).take(array.len()), opts) + } + DataType::Boolean => fixed::encode(out, as_boolean_array(array), opts), + DataType::Decimal128(_, _) => fixed::encode( + out, + array.as_any().downcast_ref::().unwrap(), + opts, + ), + DataType::Decimal256(_, _) => fixed::encode( + out, + array.as_any().downcast_ref::().unwrap(), + opts, + ), + DataType::Binary => { + variable::encode(out, as_generic_binary_array::(array).iter(), opts) + } + DataType::LargeBinary => { + variable::encode(out, as_generic_binary_array::(array).iter(), opts) + } + DataType::Utf8 => variable::encode( + out, + as_string_array(array).iter().map(|x| x.map(|x| x.as_bytes())), + opts, + ), + DataType::LargeUtf8 => variable::encode( + out, + as_largestring_array(array) + .iter() + .map(|x| x.map(|x| x.as_bytes())), + opts, + ), + DataType::Dictionary(_, _) => downcast_dictionary_array! { + array => { + let dict = dictionary.unwrap(); + for (offset, k) in out.offsets.iter_mut().skip(1).zip(array.keys()) { + match k { + Some(k) => { + let v = &dict[k as usize]; + let end_offset = *offset + 1 + v.len(); + out.buffer[*offset] = 1; + out.buffer[*offset+1..end_offset].copy_from_slice(v); + if opts.descending { + out.buffer[*offset..end_offset].iter_mut().for_each(|v| *v = !*v) + } + *offset = end_offset; + } + None => { + if !opts.nulls_first { + out.buffer[*offset] = 0xFF; + } + *offset += 1; + } + } + } + }, + _ => unreachable!() + } + t => unimplemented!("not yet implemented: {}", t) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::array::{ + BinaryArray, BooleanArray, DictionaryArray, Float32Array, GenericStringArray, + Int16Array, OffsetSizeTrait, PrimitiveArray, StringArray, + }; + use crate::compute::{LexicographicalComparator, SortColumn}; + use crate::util::display::array_value_to_string; + use rand::distributions::uniform::SampleUniform; + use rand::distributions::{Distribution, Standard}; + use rand::{thread_rng, Rng}; + use std::sync::Arc; + + #[test] + fn test_fixed_width() { + let cols = [ + Arc::new(Int16Array::from_iter([ + Some(1), + Some(2), + None, + Some(-5), + Some(2), + Some(2), + Some(0), + ])) as ArrayRef, + Arc::new(Float32Array::from_iter([ + Some(1.3), + Some(2.5), + None, + Some(4.), + Some(0.1), + Some(-4.), + Some(-0.), + ])) as ArrayRef, + ]; + + let mut converter = + RowConverter::new(vec![Default::default(), Default::default()]); + let rows = converter.convert(&cols); + + assert_eq!(rows.offsets.as_ref(), &[0, 8, 16, 24, 32, 40, 48, 56]); + assert_eq!( + rows.buffer.as_ref(), + &[ + 1, 128, 1, // + 1, 191, 166, 102, 102, // + 1, 128, 2, // + 1, 192, 32, 0, 0, // + 0, 0, 0, // + 0, 0, 0, 0, 0, // + 1, 127, 251, // + 1, 192, 128, 0, 0, // + 1, 128, 2, // + 1, 189, 204, 204, 205, // + 1, 128, 2, // + 1, 63, 127, 255, 255, // + 1, 128, 0, // + 1, 127, 255, 255, 255 // + ] + ); + + assert!(rows.row(3) < rows.row(6)); + assert!(rows.row(0) < rows.row(1)); + assert!(rows.row(3) < rows.row(0)); + assert!(rows.row(4) < rows.row(1)); + assert!(rows.row(5) < rows.row(4)) + } + + #[test] + fn test_bool() { + let mut converter = RowConverter::new(vec![Default::default()]); + + let col = Arc::new(BooleanArray::from_iter([None, Some(false), Some(true)])); + let rows = converter.convert(&[col]); + assert!(rows.row(2) > rows.row(1)); + assert!(rows.row(2) > rows.row(0)); + assert!(rows.row(1) > rows.row(0)); + + let mut converter = RowConverter::new(vec![SortOptions { + descending: true, + nulls_first: false, + }]); + + let col = Arc::new(BooleanArray::from_iter([None, Some(false), Some(true)])); + let rows = converter.convert(&[col]); + assert!(rows.row(2) < rows.row(1)); + assert!(rows.row(2) < rows.row(0)); + assert!(rows.row(1) < rows.row(0)); + } + + #[test] + fn test_variable_width() { + let col = Arc::new(StringArray::from_iter([ + Some("hello"), + Some("he"), + None, + Some("foo"), + Some(""), + ])); + + let mut converter = RowConverter::new(vec![(Default::default())]); + let rows = converter.convert(&[col]); + + assert!(rows.row(1) < rows.row(0)); + assert!(rows.row(2) < rows.row(4)); + assert!(rows.row(3) < rows.row(0)); + assert!(rows.row(3) < rows.row(1)); + + let col = Arc::new(BinaryArray::from_iter([ + None, + Some(vec![0_u8; 0]), + Some(vec![0_u8; 6]), + Some(vec![0_u8; variable::BLOCK_SIZE]), + Some(vec![0_u8; variable::BLOCK_SIZE + 1]), + Some(vec![1_u8; 6]), + Some(vec![1_u8; variable::BLOCK_SIZE]), + Some(vec![1_u8; variable::BLOCK_SIZE + 1]), + Some(vec![0xFF_u8; 6]), + Some(vec![0xFF_u8; variable::BLOCK_SIZE]), + Some(vec![0xFF_u8; variable::BLOCK_SIZE + 1]), + ])) as ArrayRef; + + let mut converter = RowConverter::new(vec![Default::default()]); + let rows = converter.convert(&[Arc::clone(&col)]); + + for i in 0..rows.num_rows() { + for j in i + 1..rows.num_rows() { + assert!( + rows.row(i) < rows.row(j), + "{} < {} - {:?} < {:?}", + i, + j, + rows.row(i), + rows.row(j) + ); + } + } + + let mut converter = RowConverter::new(vec![SortOptions { + descending: true, + nulls_first: false, + }]); + let rows = converter.convert(&[col]); + + for i in 0..rows.num_rows() { + for j in i + 1..rows.num_rows() { + assert!( + rows.row(i) > rows.row(j), + "{} > {} - {:?} > {:?}", + i, + j, + rows.row(i), + rows.row(j) + ); + } + } + } + + #[test] + fn test_dictionary() { + let mut converter = RowConverter::new(vec![Default::default()]); + + let a = Arc::new(DictionaryArray::::from_iter([ + Some("foo"), + Some("hello"), + Some("he"), + None, + Some("hello"), + Some(""), + Some("hello"), + Some("hello"), + ])) as ArrayRef; + + let rows_a = converter.convert(&[Arc::clone(&a)]); + + assert!(rows_a.row(3) < rows_a.row(5)); + assert!(rows_a.row(2) < rows_a.row(1)); + assert!(rows_a.row(0) < rows_a.row(1)); + assert!(rows_a.row(3) < rows_a.row(0)); + + assert_eq!(rows_a.row(1), rows_a.row(4)); + assert_eq!(rows_a.row(1), rows_a.row(6)); + assert_eq!(rows_a.row(1), rows_a.row(7)); + + let b = Arc::new(DictionaryArray::::from_iter([ + Some("hello"), + None, + Some("cupcakes"), + ])); + + let rows_b = converter.convert(&[b]); + assert_eq!(rows_a.row(1), rows_b.row(0)); + assert_eq!(rows_a.row(3), rows_b.row(1)); + assert!(rows_b.row(2) < rows_a.row(0)); + + let mut converter = RowConverter::new(vec![SortOptions { + descending: true, + nulls_first: false, + }]); + + let rows_c = converter.convert(&[a]); + assert!(rows_c.row(3) > rows_c.row(5)); + assert!(rows_c.row(2) > rows_c.row(1)); + assert!(rows_c.row(0) > rows_c.row(1)); + assert!(rows_c.row(3) > rows_c.row(0)); + } + + fn generate_primitive_array(len: usize, valid_percent: f64) -> PrimitiveArray + where + K: ArrowPrimitiveType, + Standard: Distribution, + { + let mut rng = thread_rng(); + (0..len) + .map(|_| rng.gen_bool(valid_percent).then(|| rng.gen())) + .collect() + } + + fn generate_strings( + len: usize, + valid_percent: f64, + ) -> GenericStringArray { + let mut rng = thread_rng(); + (0..len) + .map(|_| { + rng.gen_bool(valid_percent).then(|| { + let len = rng.gen_range(0..100); + let bytes = (0..len).map(|_| rng.gen_range(0..128)).collect(); + String::from_utf8(bytes).unwrap() + }) + }) + .collect() + } + + fn generate_dictionary( + values: ArrayRef, + len: usize, + valid_percent: f64, + ) -> DictionaryArray + where + K: ArrowDictionaryKeyType, + K::Native: SampleUniform, + { + let mut rng = thread_rng(); + let min_key = K::Native::from_usize(0).unwrap(); + let max_key = K::Native::from_usize(values.len()).unwrap(); + let keys: PrimitiveArray = (0..len) + .map(|_| { + rng.gen_bool(valid_percent) + .then(|| rng.gen_range(min_key..max_key)) + }) + .collect(); + + let data_type = DataType::Dictionary( + Box::new(K::DATA_TYPE), + Box::new(values.data_type().clone()), + ); + + let data = keys + .into_data() + .into_builder() + .data_type(data_type) + .add_child_data(values.data().clone()) + .build() + .unwrap(); + + DictionaryArray::from(data) + } + + fn generate_column(len: usize) -> ArrayRef { + let mut rng = thread_rng(); + match rng.gen_range(0..8) { + 0 => Arc::new(generate_primitive_array::(len, 0.8)), + 1 => Arc::new(generate_primitive_array::(len, 0.8)), + 2 => Arc::new(generate_primitive_array::(len, 0.8)), + 3 => Arc::new(generate_primitive_array::(len, 0.8)), + 4 => Arc::new(generate_primitive_array::(len, 0.8)), + 5 => Arc::new(generate_primitive_array::(len, 0.8)), + 6 => Arc::new(generate_strings::(len, 0.8)), + 7 => Arc::new(generate_dictionary::( + Arc::new(generate_strings::(rng.gen_range(1..len), 0.9)), + len, + 0.8, + )), + _ => unreachable!(), + } + } + + fn print_row(cols: &[SortColumn], row: usize) -> String { + let t: Vec<_> = cols + .iter() + .map(|x| array_value_to_string(&x.values, row).unwrap()) + .collect(); + t.join(",") + } + + fn print_col_types(cols: &[SortColumn]) -> String { + let t: Vec<_> = cols + .iter() + .map(|x| x.values.data_type().to_string()) + .collect(); + t.join(",") + } + + #[test] + fn fuzz_test() { + for _ in 0..100 { + let mut rng = thread_rng(); + let num_columns = rng.gen_range(1..5); + let len = rng.gen_range(5..100); + let columns: Vec<_> = + (0..num_columns).map(|_| generate_column(len)).collect(); + + let options: Vec<_> = (0..num_columns) + .map(|_| SortOptions { + descending: rng.gen_bool(0.5), + nulls_first: rng.gen_bool(0.5), + }) + .collect(); + + let sort_columns: Vec<_> = options + .iter() + .zip(&columns) + .map(|(o, c)| SortColumn { + values: Arc::clone(c), + options: Some(*o), + }) + .collect(); + + let comparator = LexicographicalComparator::try_new(&sort_columns).unwrap(); + + let mut converter = RowConverter::new(options); + let rows = converter.convert(&columns); + + for i in 0..len { + for j in 0..len { + let row_i = rows.row(i); + let row_j = rows.row(j); + let row_cmp = row_i.cmp(&row_j); + let lex_cmp = comparator.compare(&i, &j); + assert_eq!( + row_cmp, + lex_cmp, + "({:?} vs {:?}) vs ({:?} vs {:?}) for types {}", + print_row(&sort_columns, i), + print_row(&sort_columns, j), + row_i, + row_j, + print_col_types(&sort_columns) + ); + } + } + } + } +} diff --git a/arrow/src/row/variable.rs b/arrow/src/row/variable.rs new file mode 100644 index 00000000000..337fc751f89 --- /dev/null +++ b/arrow/src/row/variable.rs @@ -0,0 +1,97 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::compute::SortOptions; +use crate::row::Rows; +use crate::util::bit_util::ceil; + +/// The block size of the variable length encoding +pub const BLOCK_SIZE: usize = 32; + +/// Returns the length of the encoded representation of a byte array +pub fn encoded_len(a: Option<&[u8]>) -> usize { + match a { + Some(a) => 1 + ceil(a.len(), BLOCK_SIZE) * (BLOCK_SIZE + 1), + None => 1, + } +} + +/// Variable length values are encoded as +/// +/// - leading `0` bit if null otherwise `1` +/// - 31 bits big endian length +/// - length bytes of value data +pub fn encode<'a, I: Iterator>>( + out: &mut Rows, + i: I, + opts: SortOptions, +) { + for (offset, maybe_val) in out.offsets.iter_mut().skip(1).zip(i) { + match maybe_val { + Some(val) if val.is_empty() => { + out.buffer[*offset] = match opts.descending { + true => !1, + false => 1, + }; + *offset += 1; + } + Some(val) => { + let block_count = ceil(val.len(), BLOCK_SIZE); + let end_offset = *offset + 1 + block_count * (BLOCK_SIZE + 1); + let to_write = &mut out.buffer[*offset..end_offset]; + + // Set validity + to_write[0] = 2; + + let chunks = val.chunks_exact(BLOCK_SIZE); + let remainder = chunks.remainder(); + for (input, output) in chunks + .clone() + .zip(to_write[1..].chunks_exact_mut(BLOCK_SIZE + 1)) + { + let input: &[u8; BLOCK_SIZE] = input.try_into().unwrap(); + let out_block: &mut [u8; BLOCK_SIZE] = + (&mut output[..BLOCK_SIZE]).try_into().unwrap(); + + *out_block = *input; + output[BLOCK_SIZE] = u8::MAX; + } + + if !remainder.is_empty() { + let start_offset = 1 + (block_count - 1) * (BLOCK_SIZE + 1); + to_write[start_offset..start_offset + remainder.len()] + .copy_from_slice(remainder); + *to_write.last_mut().unwrap() = remainder.len() as u8; + } else { + *to_write.last_mut().unwrap() = BLOCK_SIZE as u8; + } + + *offset = end_offset; + + if opts.descending { + to_write.iter_mut().for_each(|v| *v = !*v) + } + } + None => { + if !opts.nulls_first { + out.buffer[*offset] = 0xFF; + } + *offset += 1; + } + } + } +} From 9f0aaa27dba749f925876478ad335513df8e1e38 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Mon, 5 Sep 2022 21:37:11 +0100 Subject: [PATCH 2/8] Skip miri on heavier tests --- arrow/src/row/interner.rs | 1 + arrow/src/row/mod.rs | 1 + 2 files changed, 2 insertions(+) diff --git a/arrow/src/row/interner.rs b/arrow/src/row/interner.rs index e139d261f7a..578f7e2b919 100644 --- a/arrow/src/row/interner.rs +++ b/arrow/src/row/interner.rs @@ -277,6 +277,7 @@ mod tests { } #[test] + #[cfg_attr(miri, ignore)] fn test_interner() { test_intern_values(&[8, 6, 5, 7]); diff --git a/arrow/src/row/mod.rs b/arrow/src/row/mod.rs index 0b1f1f7c83e..071522a23d6 100644 --- a/arrow/src/row/mod.rs +++ b/arrow/src/row/mod.rs @@ -697,6 +697,7 @@ mod tests { } #[test] + #[cfg_attr(miri, ignore)] fn fuzz_test() { for _ in 0..100 { let mut rng = thread_rng(); From 481d7a772f593b60d7d0271dd3a5429fcb5f06e5 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Thu, 8 Sep 2022 14:54:45 +0100 Subject: [PATCH 3/8] Handle nulls in dictionary values --- arrow/src/row/interner.rs | 41 +++++++++++------- arrow/src/row/mod.rs | 90 +++++++++++++++++++++++++++++++-------- 2 files changed, 99 insertions(+), 32 deletions(-) diff --git a/arrow/src/row/interner.rs b/arrow/src/row/interner.rs index 578f7e2b919..4e80f4e8371 100644 --- a/arrow/src/row/interner.rs +++ b/arrow/src/row/interner.rs @@ -18,10 +18,11 @@ use hashbrown::hash_map::RawEntryMut; use hashbrown::HashMap; use std::cmp::Ordering; +use std::num::NonZeroU32; use std::ops::Index; #[derive(Debug, Copy, Clone, PartialEq, Eq)] -pub struct Interned(u32); +pub struct Interned(NonZeroU32); /// A byte array interner that generates normalized keys that are sorted with respect /// to the interned values, e.g. `inter(a) < intern(b) => a < b` @@ -39,9 +40,9 @@ impl OrderPreservingInterner { /// Interns an iterator of values returning a list of [`Interned`] which can be /// used with [`Self::normalized_key`] to retrieve the normalized keys with a /// lifetime not tied to the mutable borrow passed to this method - pub fn intern(&mut self, input: I) -> Vec + pub fn intern(&mut self, input: I) -> Vec> where - I: IntoIterator, + I: IntoIterator>, V: AsRef<[u8]>, { let iter = input.into_iter(); @@ -50,7 +51,15 @@ impl OrderPreservingInterner { let mut to_intern: Vec<(usize, u64, V)> = Vec::with_capacity(capacity); let mut to_intern_len = 0; - for (idx, value) in iter.enumerate() { + for (idx, item) in iter.enumerate() { + let value: V = match item { + Some(value) => value, + None => { + out.push(None); + continue; + } + }; + let v = value.as_ref(); let hash = self.hasher.hash_one(v); let entry = self @@ -59,10 +68,10 @@ impl OrderPreservingInterner { .from_hash(hash, |a| &self.values[*a] == v); match entry { - RawEntryMut::Occupied(o) => out.push(*o.key()), + RawEntryMut::Occupied(o) => out.push(Some(*o.key())), RawEntryMut::Vacant(_) => { // Push placeholder - out.push(Interned(0)); + out.push(None); to_intern_len += v.len(); to_intern.push((idx, hash, value)); } @@ -86,7 +95,7 @@ impl OrderPreservingInterner { match entry { RawEntryMut::Occupied(o) => { - out[idx] = *o.key(); + out[idx] = Some(*o.key()); } RawEntryMut::Vacant(v) => { let val = value.as_ref(); @@ -100,7 +109,7 @@ impl OrderPreservingInterner { v.insert_with_hasher(hash, interned, (), |key| { hasher.hash_one(&values[*key]) }); - out[idx] = interned; + out[idx] = Some(interned); } } } @@ -141,8 +150,8 @@ impl InternBuffer { /// Appends the next value based on data written to `self.values` /// returning the corresponding [`Interned`] fn append(&mut self) -> Interned { - let idx = self.offsets.len() - 1; - let key = Interned(idx.try_into().unwrap()); + let idx: u32 = self.offsets.len().try_into().unwrap(); + let key = Interned(NonZeroU32::new(idx).unwrap()); self.offsets.push(self.values.len()); key } @@ -152,9 +161,9 @@ impl Index for InternBuffer { type Output = [u8]; fn index(&self, key: Interned) -> &Self::Output { - let index = key.0 as usize; - let end = self.offsets[index + 1]; - let start = self.offsets[index]; + let index = key.0.get() as usize; + let end = self.offsets[index]; + let start = self.offsets[index - 1]; unsafe { self.values.get_unchecked(start..end) } } } @@ -255,7 +264,8 @@ mod tests { // Intern a single value at a time to check ordering let interned: Vec<_> = values .iter() - .flat_map(|v| interner.intern([&v.to_be_bytes()])) + .flat_map(|v| interner.intern([Some(&v.to_be_bytes())])) + .map(Option::unwrap) .collect(); let interned: Vec<_> = interned @@ -295,7 +305,8 @@ mod tests { let values = vec![0_u8, 1, 8, 4, 1, 0]; let mut interner = OrderPreservingInterner::default(); - let interned = interner.intern(values.iter().map(std::slice::from_ref)); + let interned = interner.intern(values.iter().map(std::slice::from_ref).map(Some)); + let interned: Vec<_> = interned.into_iter().map(Option::unwrap).collect(); assert_eq!(interned[0], interned[5]); assert_eq!(interned[1], interned[4]); diff --git a/arrow/src/row/mod.rs b/arrow/src/row/mod.rs index 071522a23d6..8ca29c54eff 100644 --- a/arrow/src/row/mod.rs +++ b/arrow/src/row/mod.rs @@ -167,7 +167,9 @@ impl RowConverter { let mapping: Vec<_> = compute_dictionary_mapping(interner, values) .into_iter() - .map(|interned| interner.normalized_key(interned)) + .map(|maybe_interned| { + maybe_interned.map(|interned| interner.normalized_key(interned)) + }) .collect(); Some(mapping) @@ -229,26 +231,25 @@ impl<'a> AsRef<[u8]> for Row<'a> { fn compute_dictionary_mapping( interner: &mut OrderPreservingInterner, values: &ArrayRef, -) -> Vec { +) -> Vec> { use fixed::FixedLengthEncoding; downcast_primitive_array! { values => interner - .intern(values.iter().map(|x| x.map(|x| x.encode()) - .unwrap_or_default())), + .intern(values.iter().map(|x| x.map(|x| x.encode()))), DataType::Binary => { - let iter = as_generic_binary_array::(values).iter().map(|x| x.unwrap_or_default()); + let iter = as_generic_binary_array::(values).iter(); interner.intern(iter) } DataType::LargeBinary => { - let iter = as_generic_binary_array::(values).iter().map(|x| x.unwrap_or_default()); + let iter = as_generic_binary_array::(values).iter(); interner.intern(iter) } DataType::Utf8 => { - let iter = as_string_array(values).iter().map(|x| x.unwrap_or_default().as_bytes()); + let iter = as_string_array(values).iter().map(|x| x.map(|x| x.as_bytes())); interner.intern(iter) } DataType::LargeUtf8 => { - let iter = as_largestring_array(values).iter().map(|x| x.unwrap_or_default().as_bytes()); + let iter = as_largestring_array(values).iter().map(|x| x.map(|x| x.as_bytes())); interner.intern(iter) } t => unreachable!("dictionary value {} is not supported", t) @@ -256,7 +257,10 @@ fn compute_dictionary_mapping( } /// Computes the length of each encoded [`Rows`] and returns an empty [`Rows`] -fn new_empty_rows(cols: &[ArrayRef], dictionaries: &[Option>]) -> Rows { +fn new_empty_rows( + cols: &[ArrayRef], + dictionaries: &[Option>>], +) -> Rows { let num_rows = cols.first().map(|x| x.len()).unwrap_or(0); let mut lengths = vec![0; num_rows]; @@ -291,8 +295,8 @@ fn new_empty_rows(cols: &[ArrayRef], dictionaries: &[Option>]) -> Row array => { let dict = dict.as_ref().unwrap(); for (v, length) in array.keys().iter().zip(lengths.iter_mut()) { - match v { - Some(v) => *length += dict[v as usize].len() + 1, + match v.and_then(|v| dict[v as usize]) { + Some(k) => *length += k.len() + 1, None => *length += 1, } } @@ -325,7 +329,7 @@ fn encode_column( out: &mut Rows, array: &ArrayRef, opts: SortOptions, - dictionary: Option<&[&[u8]]>, + dictionary: Option<&[Option<&[u8]>]>, ) { downcast_primitive_array! { array => fixed::encode(out, array, opts), @@ -365,9 +369,8 @@ fn encode_column( array => { let dict = dictionary.unwrap(); for (offset, k) in out.offsets.iter_mut().skip(1).zip(array.keys()) { - match k { - Some(k) => { - let v = &dict[k as usize]; + match k.and_then(|k| dict[k as usize]) { + Some(v) => { let end_offset = *offset + 1 + v.len(); out.buffer[*offset] = 1; out.buffer[*offset+1..end_offset].copy_from_slice(v); @@ -396,7 +399,8 @@ mod tests { use super::*; use crate::array::{ BinaryArray, BooleanArray, DictionaryArray, Float32Array, GenericStringArray, - Int16Array, OffsetSizeTrait, PrimitiveArray, StringArray, + Int16Array, Int32Array, Int32Builder, OffsetSizeTrait, PrimitiveArray, + PrimitiveDictionaryBuilder, StringArray, }; use crate::compute::{LexicographicalComparator, SortColumn}; use crate::util::display::array_value_to_string; @@ -551,7 +555,7 @@ mod tests { } #[test] - fn test_dictionary() { + fn test_string_dictionary() { let mut converter = RowConverter::new(vec![Default::default()]); let a = Arc::new(DictionaryArray::::from_iter([ @@ -599,6 +603,57 @@ mod tests { assert!(rows_c.row(3) > rows_c.row(0)); } + #[test] + fn test_primitive_dictionary() { + let mut converter = RowConverter::new(vec![Default::default()]); + + let mut builder = + PrimitiveDictionaryBuilder::new(Int32Builder::new(), Int32Builder::new()); + builder.append(2).unwrap(); + builder.append(3).unwrap(); + builder.append(0).unwrap(); + builder.append_null(); + builder.append(5).unwrap(); + builder.append(3).unwrap(); + builder.append(-1).unwrap(); + + let rows = converter.convert(&[Arc::new(builder.finish())]); + assert!(rows.row(0) < rows.row(1)); + assert!(rows.row(2) < rows.row(0)); + assert!(rows.row(3) < rows.row(2)); + assert!(rows.row(6) < rows.row(2)); + assert!(rows.row(3) < rows.row(6)); + } + + #[test] + fn test_dictionary_nulls() { + let mut converter = RowConverter::new(vec![Default::default()]); + + let values = + Int32Array::from_iter([Some(1), Some(-1), None, Some(4), None]).into_data(); + let keys = + Int32Array::from_iter([Some(0), Some(0), Some(1), Some(2), Some(4), None]) + .into_data(); + + let data = keys + .into_builder() + .data_type(DataType::Dictionary( + Box::new(DataType::Int32), + Box::new(DataType::Int32), + )) + .child_data(vec![values]) + .build() + .unwrap(); + + let rows = + converter.convert(&[Arc::new(DictionaryArray::::from(data))]); + + assert_eq!(rows.row(0), rows.row(1)); + assert_eq!(rows.row(3), rows.row(4)); + assert_eq!(rows.row(4), rows.row(5)); + assert!(rows.row(3) < rows.row(0)); + } + fn generate_primitive_array(len: usize, valid_percent: f64) -> PrimitiveArray where K: ArrowPrimitiveType, @@ -663,6 +718,7 @@ mod tests { fn generate_column(len: usize) -> ArrayRef { let mut rng = thread_rng(); + // Cannot test PrimitiveDictionary because of #2687 match rng.gen_range(0..8) { 0 => Arc::new(generate_primitive_array::(len, 0.8)), 1 => Arc::new(generate_primitive_array::(len, 0.8)), From e20143b6df1c251e0ddf80706c819a0f25cad540 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Thu, 8 Sep 2022 15:16:42 +0100 Subject: [PATCH 4/8] Don't fuzz test dictionaries with null values --- arrow/src/row/mod.rs | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/arrow/src/row/mod.rs b/arrow/src/row/mod.rs index 8ca29c54eff..6825c93b0ec 100644 --- a/arrow/src/row/mod.rs +++ b/arrow/src/row/mod.rs @@ -718,8 +718,7 @@ mod tests { fn generate_column(len: usize) -> ArrayRef { let mut rng = thread_rng(); - // Cannot test PrimitiveDictionary because of #2687 - match rng.gen_range(0..8) { + match rng.gen_range(0..9) { 0 => Arc::new(generate_primitive_array::(len, 0.8)), 1 => Arc::new(generate_primitive_array::(len, 0.8)), 2 => Arc::new(generate_primitive_array::(len, 0.8)), @@ -728,7 +727,17 @@ mod tests { 5 => Arc::new(generate_primitive_array::(len, 0.8)), 6 => Arc::new(generate_strings::(len, 0.8)), 7 => Arc::new(generate_dictionary::( - Arc::new(generate_strings::(rng.gen_range(1..len), 0.9)), + // Cannot test dictionaries containing null values because of #2687 + Arc::new(generate_strings::(rng.gen_range(1..len), 1.0)), + len, + 0.8, + )), + 8 => Arc::new(generate_dictionary::( + // Cannot test dictionaries containing null values because of #2687 + Arc::new(generate_primitive_array::( + rng.gen_range(1..len), + 1.0, + )), len, 0.8, )), From 4b7c3c09fd1994fb0386e84206175dbd9f2754c2 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Fri, 9 Sep 2022 15:52:18 +0100 Subject: [PATCH 5/8] Add docs --- arrow/src/row/fixed.rs | 10 ++- arrow/src/row/interner.rs | 133 ++++++++++++++++++++++++++++++++++++-- arrow/src/row/mod.rs | 35 ++++++++-- arrow/src/row/variable.rs | 20 ++++-- 4 files changed, 184 insertions(+), 14 deletions(-) diff --git a/arrow/src/row/fixed.rs b/arrow/src/row/fixed.rs index 41444d769bf..1e5d3901a76 100644 --- a/arrow/src/row/fixed.rs +++ b/arrow/src/row/fixed.rs @@ -22,6 +22,8 @@ use crate::row::Rows; use crate::util::decimal::{Decimal128, Decimal256}; use half::f16; +/// Encodes a value of a particular fixed width type into bytes according to the rules +/// described on [`super::RowConverter`] pub trait FixedLengthEncoding: Copy { fn encode(self) -> [u8; N]; } @@ -37,7 +39,7 @@ macro_rules! encode_signed { impl FixedLengthEncoding<$n> for $t { fn encode(self) -> [u8; $n] { let mut b = self.to_be_bytes(); - // Toggle top bit + // Toggle top "sign" bit to ensure consistent sort order b[0] ^= 0x80; b } @@ -96,7 +98,9 @@ impl FixedLengthEncoding<8> for f64 { impl FixedLengthEncoding<16> for Decimal128 { fn encode(self) -> [u8; 16] { let mut val = *self.raw_value(); + // Convert to big endian representation val.reverse(); + // Toggle top "sign" bit to ensure consistent sort order val[0] ^= 0x80; val } @@ -105,12 +109,15 @@ impl FixedLengthEncoding<16> for Decimal128 { impl FixedLengthEncoding<32> for Decimal256 { fn encode(self) -> [u8; 32] { let mut val = *self.raw_value(); + // Convert to big endian representation val.reverse(); + // Toggle top "sign" bit to ensure consistent sort order val[0] ^= 0x80; val } } +/// Returns the total encoded length (including null byte) for a value of type `T::Native` pub const fn encoded_len(_col: &PrimitiveArray) -> usize { std::mem::size_of::() + 1 } @@ -135,6 +142,7 @@ pub fn encode< to_write[0] = 1; let mut encoded = val.encode(); if opts.descending { + // Flip bits to reverse order encoded.iter_mut().for_each(|v| *v = !*v) } to_write[1..].copy_from_slice(&encoded) diff --git a/arrow/src/row/interner.rs b/arrow/src/row/interner.rs index 4e80f4e8371..77edb97e8d1 100644 --- a/arrow/src/row/interner.rs +++ b/arrow/src/row/interner.rs @@ -21,17 +21,22 @@ use std::cmp::Ordering; use std::num::NonZeroU32; use std::ops::Index; +/// An interned value #[derive(Debug, Copy, Clone, PartialEq, Eq)] -pub struct Interned(NonZeroU32); +pub struct Interned(NonZeroU32); // We use NonZeroU32 so that `Option` is 32 bits /// A byte array interner that generates normalized keys that are sorted with respect /// to the interned values, e.g. `inter(a) < intern(b) => a < b` #[derive(Debug, Default)] pub struct OrderPreservingInterner { + /// Provides a lookup from [`Interned`] to the normalized key keys: InternBuffer, + /// Provides a lookup from [`Interned`] to the normalized value values: InternBuffer, + /// Key allocation data structure bucket: Box, + // A hash table used to perform faster re-keying, and detect duplicates hasher: ahash::RandomState, lookup: HashMap, } @@ -48,6 +53,8 @@ impl OrderPreservingInterner { let iter = input.into_iter(); let capacity = iter.size_hint().0; let mut out = Vec::with_capacity(capacity); + + // (index in output, hash value, value) let mut to_intern: Vec<(usize, u64, V)> = Vec::with_capacity(capacity); let mut to_intern_len = 0; @@ -127,7 +134,9 @@ impl OrderPreservingInterner { /// A buffer of `[u8]` indexed by `[Interned]` #[derive(Debug)] struct InternBuffer { + /// Raw values values: Vec, + /// The ith value is `&values[offsets[i]..offsets[i+1]]` offsets: Vec, } @@ -164,6 +173,9 @@ impl Index for InternBuffer { let index = key.0.get() as usize; let end = self.offsets[index]; let start = self.offsets[index - 1]; + // SAFETY: + // self.values is never reduced in size and values appended + // to self.offsets are always less than self.values at the time unsafe { self.values.get_unchecked(start..end) } } } @@ -175,11 +187,122 @@ impl Index for InternBuffer { #[derive(Debug, Default, Clone)] struct Slot { value: Option, - /// Child values smaller than `self.value` if any + /// Child values less than `self.value` if any child: Option>, } -/// Each bucket corresponds to a single byte in the normalized key +/// Bucket is the root of the data-structure used to allocate normalized keys +/// +/// In particular it needs to generate keys that +/// +/// * Contain no `0` bytes other than the null terminator +/// * Compare lexicographically in the same manner as the encoded `data` +/// +/// The data structure consists of 255 slots, each of which can store a value. +/// Additionally each slot may contain a child bucket, containing values smaller +/// than the value within the slot +/// +/// # Allocation Strategy +/// +/// To find the insertion point within a Bucket we perform a binary search of the slots, but +/// capping the search range at 4. Visualizing this as a search tree, the root would have 64 +/// children, with subsequent non-leaf nodes each containing two children. +/// +/// The insertion point is the first empty slot we encounter, otherwise it is the first slot +/// that contains a value greater than the value being inserted +/// +/// For example, initially all slots are empty +/// +/// ```ignore +/// 0: +/// 1: +/// . +/// . +/// 254: +/// ``` +/// +/// Insert `1000` +/// +/// ```ignore +/// 0: +/// 1: +/// 2: +/// 3: 1000 <- 1. slot is empty, insert here +/// 4: +/// . +/// . +/// 254: +/// ``` +/// +/// Insert `500` +/// +/// ```ignore +/// 0: +/// 1: 500 <- 2. slot is empty, insert here +/// 2: +/// 3: 1000 <- 1. compare against slot value +/// 4. +/// . +/// . +/// 254: +/// ``` +/// +/// Insert `600` +/// +/// ```ignore +/// 0: +/// 1: 500 <- 2. compare against slot value +/// 2: 600 <- 3. slot is empty, insert here +/// 3: 1000 <- 1. compare against slot value +/// 4. +/// . +/// . +/// 254: +/// ``` +/// +/// Insert `400` +/// +/// ```ignore +/// 0: 400 <- 3. slot is empty, insert here +/// 1: 500 <- 2. compare against slot value +/// 2: 600 +/// 3: 1000 <- 1. compare against slot value +/// 4. +/// . +/// . +/// 254: +/// ``` +/// +/// Insert `700` +/// +/// ```ignore +/// 0: 400 +/// 1: 500 <- 2. compare against slot value +/// 2: 600 <- 3. slot is occupied and end of search +/// 3: 1000 <- 1. compare against slot value +/// 4. +/// . +/// . +/// 254: +/// ``` +/// +/// In this case we reach the end of our search and need to insert a value between +/// slots 2 and 3. To do this we create a new bucket under slot 3, and repeat +/// the process for that bucket. +/// +/// The final key will consists of the slot indexes visited incremented by 1, +/// with the final value incremented by 2, followed by a null terminator. +/// +/// So in the above example we would have +/// +/// ```ignore +/// 400: &[2, 0] +/// 500: &[3, 0] +/// 600: &[4, 0] +/// 700: &[4, 5, 0] +/// 1000: &[5, 0] +/// ``` +/// #[derive(Debug, Clone)] struct Bucket { slots: Box<[Slot]>, @@ -236,7 +359,9 @@ impl Bucket { Ok(_) => unreachable!("value already exists"), Err(idx) => { let slot = &mut self.slots[idx]; - // Must always spill final slot so can always create a value less than + // Cannot insert a value into slot 254 as would overflow byte, but also + // would prevent inserting any larger values, as the child bucket can + // only contain values less than the slot if idx != 254 && slot.value.is_none() { out.push(idx as u8 + 2); slot.value = Some(values_buf.insert(data)) diff --git a/arrow/src/row/mod.rs b/arrow/src/row/mod.rs index 6825c93b0ec..ea3a33851e2 100644 --- a/arrow/src/row/mod.rs +++ b/arrow/src/row/mod.rs @@ -120,7 +120,7 @@ mod variable; /// ## Reconstruction /// /// Given a schema it would theoretically be possible to reconstruct the columnar data from -/// the row format, however, this is currently not supported. It is recommended that the row +/// the row format, however, this is currently not implemented. It is recommended that the row /// format is instead used to obtain a sorted list of row indices, which can then be used /// with [`take`](crate::compute::take) to obtain a sorted [`Array`] /// @@ -132,12 +132,14 @@ mod variable; /// [byte stuffing]:[https://en.wikipedia.org/wiki/High-Level_Data_Link_Control#Asynchronous_framing] #[derive(Debug)] pub struct RowConverter { + /// Sort options for column `i` options: Vec, + /// interning state for column `i`, if column`i` is a dictionary dictionaries: Vec>>, } impl RowConverter { - /// Create a new [`RowConverter`] with the following schema and options + /// Create a new [`RowConverter`] with the following options pub fn new(options: Vec) -> Self { let dictionaries = (0..options.len()).map(|_| None).collect(); Self { @@ -146,7 +148,9 @@ impl RowConverter { } } - /// Convert `cols` into [`Rows`] + /// Convert [`ArrayRef`]s into [`Rows`] + /// + /// See [`Row`] for information on when [`Row`] can be compared /// /// # Panics /// @@ -201,7 +205,9 @@ impl RowConverter { /// See [`RowConverter`] #[derive(Debug)] pub struct Rows { + /// Underlying row bytes buffer: Box<[u8]>, + /// Row `i` has data `&buffer[offsets[i]..offsets[i+1]]` offsets: Box<[usize]>, } @@ -217,7 +223,13 @@ impl Rows { } } -/// A comparable representation of a row, see [`Rows`] +/// A comparable representation of a row +/// +/// Two [`Row`] can be compared if they both belong to [`Rows`] returned by calls to +/// [`RowConvert::converter`] on the same [`RowConverter`], with the same number of arrays, +/// with the data types of each array index remaining consistent. +/// +/// Otherwise any ordering established by comparing the [`Row`] is arbitrary #[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)] pub struct Row<'a>(&'a [u8]); @@ -310,6 +322,21 @@ fn new_empty_rows( let mut offsets = Vec::with_capacity(num_rows + 1); offsets.push(0); + // We initialize the offsets shifted down by one row index. + // + // As the rows are appended to the offsets will be incremented to match + // + // For example, consider the case of 3 rows of length 3, 4, and 6 respectively. + // The offsets would be initialized to `0, 0, 3, 7` + // + // Writing the first row entirely would yield `0, 3, 3, 7` + // The second, `0, 3, 7, 7` + // The third, `0, 3, 7, 13` + // + // This would be the final offsets for reading + // + // In this way offsets tracks the position during writing whilst eventually serving + // as identifying the offsets of the written rows let mut cur_offset = 0_usize; for l in lengths { offsets.push(cur_offset); diff --git a/arrow/src/row/variable.rs b/arrow/src/row/variable.rs index 337fc751f89..35b4e27d305 100644 --- a/arrow/src/row/variable.rs +++ b/arrow/src/row/variable.rs @@ -22,7 +22,7 @@ use crate::util::bit_util::ceil; /// The block size of the variable length encoding pub const BLOCK_SIZE: usize = 32; -/// Returns the length of the encoded representation of a byte array +/// Returns the length of the encoded representation of a byte array, including the null byte pub fn encoded_len(a: Option<&[u8]>) -> usize { match a { Some(a) => 1 + ceil(a.len(), BLOCK_SIZE) * (BLOCK_SIZE + 1), @@ -32,9 +32,15 @@ pub fn encoded_len(a: Option<&[u8]>) -> usize { /// Variable length values are encoded as /// -/// - leading `0` bit if null otherwise `1` -/// - 31 bits big endian length -/// - length bytes of value data +/// - single `0_u8` if null +/// - single `1_u8` if empty array +/// - `2_u8` if not empty, followed by one or more blocks +/// +/// where a block is encoded as +/// +/// - [`BLOCK_SIZE`] bytes of string data, padded with 0s +/// - `0xFF_u8` if this is not the last block for this string +/// - otherwise the length of the block as a `u8` pub fn encode<'a, I: Iterator>>( out: &mut Rows, i: I, @@ -54,7 +60,7 @@ pub fn encode<'a, I: Iterator>>( let end_offset = *offset + 1 + block_count * (BLOCK_SIZE + 1); let to_write = &mut out.buffer[*offset..end_offset]; - // Set validity + // Write validity byte to demarcate as non-empty string to_write[0] = 2; let chunks = val.chunks_exact(BLOCK_SIZE); @@ -68,6 +74,8 @@ pub fn encode<'a, I: Iterator>>( (&mut output[..BLOCK_SIZE]).try_into().unwrap(); *out_block = *input; + + // Indicate that there are further blocks to follow output[BLOCK_SIZE] = u8::MAX; } @@ -77,12 +85,14 @@ pub fn encode<'a, I: Iterator>>( .copy_from_slice(remainder); *to_write.last_mut().unwrap() = remainder.len() as u8; } else { + // We must overwrite the continuation marker written by the loop above *to_write.last_mut().unwrap() = BLOCK_SIZE as u8; } *offset = end_offset; if opts.descending { + // Invert bits to_write.iter_mut().for_each(|v| *v = !*v) } } From 400247a28d84886500f3daf30dc011ab32535f54 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Fri, 9 Sep 2022 16:06:19 +0100 Subject: [PATCH 6/8] Add error plumbing --- arrow/src/row/mod.rs | 60 +++++++++++++++++++++++--------------------- 1 file changed, 31 insertions(+), 29 deletions(-) diff --git a/arrow/src/row/mod.rs b/arrow/src/row/mod.rs index ea3a33851e2..dec3d27bf97 100644 --- a/arrow/src/row/mod.rs +++ b/arrow/src/row/mod.rs @@ -23,6 +23,7 @@ use crate::array::{ }; use crate::compute::SortOptions; use crate::datatypes::*; +use crate::error::{ArrowError, Result}; use crate::row::interner::{Interned, OrderPreservingInterner}; use crate::{downcast_dictionary_array, downcast_primitive_array}; @@ -155,32 +156,32 @@ impl RowConverter { /// # Panics /// /// Panics if the schema of `cols` does not match that provided to [`RowConverter::new`] - pub fn convert(&mut self, arrays: &[ArrayRef]) -> Rows { + pub fn convert(&mut self, arrays: &[ArrayRef]) -> Result { assert_eq!(arrays.len(), self.options.len(), "column count mismatch"); - let dictionaries: Vec<_> = arrays + let dictionaries = arrays .iter() .zip(&mut self.dictionaries) .map(|(array, dictionary)| { let values = downcast_dictionary_array! { array => array.values(), - _ => return None + _ => return Ok(None) }; let interner = dictionary.get_or_insert_with(Default::default); - let mapping: Vec<_> = compute_dictionary_mapping(interner, values) + let mapping: Vec<_> = compute_dictionary_mapping(interner, values)? .into_iter() .map(|maybe_interned| { maybe_interned.map(|interned| interner.normalized_key(interned)) }) .collect(); - Some(mapping) + Ok(Some(mapping)) }) - .collect(); + .collect::>>()?; - let mut rows = new_empty_rows(arrays, &dictionaries); + let mut rows = new_empty_rows(arrays, &dictionaries)?; for ((array, options), dictionary) in arrays.iter().zip(&self.options).zip(dictionaries) @@ -196,7 +197,7 @@ impl RowConverter { .for_each(|w| assert!(w[0] < w[1], "offsets should be monotonic")); } - rows + Ok(rows) } } @@ -243,9 +244,9 @@ impl<'a> AsRef<[u8]> for Row<'a> { fn compute_dictionary_mapping( interner: &mut OrderPreservingInterner, values: &ArrayRef, -) -> Vec> { +) -> Result>> { use fixed::FixedLengthEncoding; - downcast_primitive_array! { + Ok(downcast_primitive_array! { values => interner .intern(values.iter().map(|x| x.map(|x| x.encode()))), DataType::Binary => { @@ -264,15 +265,15 @@ fn compute_dictionary_mapping( let iter = as_largestring_array(values).iter().map(|x| x.map(|x| x.as_bytes())); interner.intern(iter) } - t => unreachable!("dictionary value {} is not supported", t) - } + t => return Err(ArrowError::NotYetImplemented(format!("dictionary value {} is not supported", t))), + }) } /// Computes the length of each encoded [`Rows`] and returns an empty [`Rows`] fn new_empty_rows( cols: &[ArrayRef], dictionaries: &[Option>>], -) -> Rows { +) -> Result { let num_rows = cols.first().map(|x| x.len()).unwrap_or(0); let mut lengths = vec![0; num_rows]; @@ -315,7 +316,7 @@ fn new_empty_rows( } _ => unreachable!(), } - t => unimplemented!("not yet implemented: {}", t) + t => return Err(ArrowError::NotYetImplemented(format!("not yet implemented: {}", t))) } } @@ -345,10 +346,10 @@ fn new_empty_rows( let buffer = vec![0_u8; cur_offset]; - Rows { + Ok(Rows { buffer: buffer.into(), offsets: offsets.into(), - } + }) } /// Encodes a column to the provided [`Rows`] incrementing the offsets as it progresses @@ -461,7 +462,7 @@ mod tests { let mut converter = RowConverter::new(vec![Default::default(), Default::default()]); - let rows = converter.convert(&cols); + let rows = converter.convert(&cols).unwrap(); assert_eq!(rows.offsets.as_ref(), &[0, 8, 16, 24, 32, 40, 48, 56]); assert_eq!( @@ -496,7 +497,7 @@ mod tests { let mut converter = RowConverter::new(vec![Default::default()]); let col = Arc::new(BooleanArray::from_iter([None, Some(false), Some(true)])); - let rows = converter.convert(&[col]); + let rows = converter.convert(&[col]).unwrap(); assert!(rows.row(2) > rows.row(1)); assert!(rows.row(2) > rows.row(0)); assert!(rows.row(1) > rows.row(0)); @@ -507,7 +508,7 @@ mod tests { }]); let col = Arc::new(BooleanArray::from_iter([None, Some(false), Some(true)])); - let rows = converter.convert(&[col]); + let rows = converter.convert(&[col]).unwrap(); assert!(rows.row(2) < rows.row(1)); assert!(rows.row(2) < rows.row(0)); assert!(rows.row(1) < rows.row(0)); @@ -524,7 +525,7 @@ mod tests { ])); let mut converter = RowConverter::new(vec![(Default::default())]); - let rows = converter.convert(&[col]); + let rows = converter.convert(&[col]).unwrap(); assert!(rows.row(1) < rows.row(0)); assert!(rows.row(2) < rows.row(4)); @@ -546,7 +547,7 @@ mod tests { ])) as ArrayRef; let mut converter = RowConverter::new(vec![Default::default()]); - let rows = converter.convert(&[Arc::clone(&col)]); + let rows = converter.convert(&[Arc::clone(&col)]).unwrap(); for i in 0..rows.num_rows() { for j in i + 1..rows.num_rows() { @@ -565,7 +566,7 @@ mod tests { descending: true, nulls_first: false, }]); - let rows = converter.convert(&[col]); + let rows = converter.convert(&[col]).unwrap(); for i in 0..rows.num_rows() { for j in i + 1..rows.num_rows() { @@ -596,7 +597,7 @@ mod tests { Some("hello"), ])) as ArrayRef; - let rows_a = converter.convert(&[Arc::clone(&a)]); + let rows_a = converter.convert(&[Arc::clone(&a)]).unwrap(); assert!(rows_a.row(3) < rows_a.row(5)); assert!(rows_a.row(2) < rows_a.row(1)); @@ -613,7 +614,7 @@ mod tests { Some("cupcakes"), ])); - let rows_b = converter.convert(&[b]); + let rows_b = converter.convert(&[b]).unwrap(); assert_eq!(rows_a.row(1), rows_b.row(0)); assert_eq!(rows_a.row(3), rows_b.row(1)); assert!(rows_b.row(2) < rows_a.row(0)); @@ -623,7 +624,7 @@ mod tests { nulls_first: false, }]); - let rows_c = converter.convert(&[a]); + let rows_c = converter.convert(&[a]).unwrap(); assert!(rows_c.row(3) > rows_c.row(5)); assert!(rows_c.row(2) > rows_c.row(1)); assert!(rows_c.row(0) > rows_c.row(1)); @@ -644,7 +645,7 @@ mod tests { builder.append(3).unwrap(); builder.append(-1).unwrap(); - let rows = converter.convert(&[Arc::new(builder.finish())]); + let rows = converter.convert(&[Arc::new(builder.finish())]).unwrap(); assert!(rows.row(0) < rows.row(1)); assert!(rows.row(2) < rows.row(0)); assert!(rows.row(3) < rows.row(2)); @@ -672,8 +673,9 @@ mod tests { .build() .unwrap(); - let rows = - converter.convert(&[Arc::new(DictionaryArray::::from(data))]); + let rows = converter + .convert(&[Arc::new(DictionaryArray::::from(data))]) + .unwrap(); assert_eq!(rows.row(0), rows.row(1)); assert_eq!(rows.row(3), rows.row(4)); @@ -817,7 +819,7 @@ mod tests { let comparator = LexicographicalComparator::try_new(&sort_columns).unwrap(); let mut converter = RowConverter::new(options); - let rows = converter.convert(&columns); + let rows = converter.convert(&columns).unwrap(); for i in 0..len { for j in 0..len { From 5bba288e4a4d73ab86aed1eb408a2d591179391f Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Sat, 10 Sep 2022 18:22:41 +0100 Subject: [PATCH 7/8] Review feedback --- arrow/benches/row_format.rs | 44 ++++++-- arrow/src/row/fixed.rs | 10 +- arrow/src/row/mod.rs | 215 ++++++++++++++++++++++-------------- arrow/src/row/variable.rs | 2 +- 4 files changed, 177 insertions(+), 94 deletions(-) diff --git a/arrow/benches/row_format.rs b/arrow/benches/row_format.rs index 5f554cc5737..2802aa6ece0 100644 --- a/arrow/benches/row_format.rs +++ b/arrow/benches/row_format.rs @@ -20,8 +20,8 @@ extern crate criterion; extern crate core; use arrow::array::ArrayRef; -use arrow::datatypes::{Int64Type, UInt64Type}; -use arrow::row::RowConverter; +use arrow::datatypes::{DataType, Int64Type, UInt64Type}; +use arrow::row::{RowConverter, SortField}; use arrow::util::bench_util::{create_primitive_array, create_string_array_with_len}; use criterion::{black_box, Criterion}; use std::sync::Arc; @@ -30,41 +30,59 @@ fn row_bench(c: &mut Criterion) { let cols = vec![Arc::new(create_primitive_array::(4096, 0.)) as ArrayRef]; c.bench_function("row_batch 4096 u64(0)", |b| { - b.iter(|| black_box(RowConverter::new(vec![Default::default()]).convert(&cols))); + b.iter(|| { + let mut converter = RowConverter::new(vec![SortField::new(DataType::UInt64)]); + black_box(converter.convert_columns(&cols)) + }); }); let cols = vec![Arc::new(create_primitive_array::(4096, 0.)) as ArrayRef]; c.bench_function("row_batch 4096 i64(0)", |b| { - b.iter(|| black_box(RowConverter::new(vec![Default::default()]).convert(&cols))); + b.iter(|| { + let mut converter = RowConverter::new(vec![SortField::new(DataType::Int64)]); + black_box(converter.convert_columns(&cols)) + }); }); let cols = vec![Arc::new(create_string_array_with_len::(4096, 0., 10)) as ArrayRef]; c.bench_function("row_batch 4096 string(10, 0)", |b| { - b.iter(|| black_box(RowConverter::new(vec![Default::default()]).convert(&cols))); + b.iter(|| { + let mut converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]); + black_box(converter.convert_columns(&cols)) + }); }); let cols = vec![Arc::new(create_string_array_with_len::(4096, 0., 30)) as ArrayRef]; c.bench_function("row_batch 4096 string(30, 0)", |b| { - b.iter(|| black_box(RowConverter::new(vec![Default::default()]).convert(&cols))); + b.iter(|| { + let mut converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]); + black_box(converter.convert_columns(&cols)) + }); }); let cols = vec![Arc::new(create_string_array_with_len::(4096, 0., 100)) as ArrayRef]; c.bench_function("row_batch 4096 string(100, 0)", |b| { - b.iter(|| black_box(RowConverter::new(vec![Default::default()]).convert(&cols))); + b.iter(|| { + let mut converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]); + black_box(converter.convert_columns(&cols)) + }); }); let cols = vec![Arc::new(create_string_array_with_len::(4096, 0.5, 100)) as ArrayRef]; c.bench_function("row_batch 4096 string(100, 0.5)", |b| { - b.iter(|| black_box(RowConverter::new(vec![Default::default()]).convert(&cols))); + b.iter(|| { + let mut converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]); + black_box(converter.convert_columns(&cols)) + }); }); let cols = [ @@ -74,11 +92,19 @@ fn row_bench(c: &mut Criterion) { Arc::new(create_primitive_array::(4096, 0.)) as ArrayRef, ]; + let fields = [ + SortField::new(DataType::Utf8), + SortField::new(DataType::Utf8), + SortField::new(DataType::Utf8), + SortField::new(DataType::Int64), + ]; + c.bench_function( "row_batch 4096 string(20, 0.5), string(30, 0), string(100, 0), i64(0)", |b| { b.iter(|| { - black_box(RowConverter::new(vec![Default::default()]).convert(&cols)) + let mut converter = RowConverter::new(fields.to_vec()); + black_box(converter.convert_columns(&cols)) }); }, ); diff --git a/arrow/src/row/fixed.rs b/arrow/src/row/fixed.rs index 1e5d3901a76..78108274241 100644 --- a/arrow/src/row/fixed.rs +++ b/arrow/src/row/fixed.rs @@ -25,6 +25,8 @@ use half::f16; /// Encodes a value of a particular fixed width type into bytes according to the rules /// described on [`super::RowConverter`] pub trait FixedLengthEncoding: Copy { + const ENCODED_LEN: usize = 1 + N; + fn encode(self) -> [u8; N]; } @@ -118,8 +120,12 @@ impl FixedLengthEncoding<32> for Decimal256 { } /// Returns the total encoded length (including null byte) for a value of type `T::Native` -pub const fn encoded_len(_col: &PrimitiveArray) -> usize { - std::mem::size_of::() + 1 +pub const fn encoded_len(_col: &PrimitiveArray) -> usize +where + T: ArrowPrimitiveType, + T::Native: FixedLengthEncoding, +{ + T::Native::ENCODED_LEN } /// Fixed width types are encoded as diff --git a/arrow/src/row/mod.rs b/arrow/src/row/mod.rs index edfe5c166d5..5b51632a882 100644 --- a/arrow/src/row/mod.rs +++ b/arrow/src/row/mod.rs @@ -25,13 +25,14 @@ use crate::compute::SortOptions; use crate::datatypes::*; use crate::error::{ArrowError, Result}; use crate::row::interner::{Interned, OrderPreservingInterner}; +use crate::util::decimal::{Decimal128, Decimal256}; use crate::{downcast_dictionary_array, downcast_primitive_array}; mod fixed; mod interner; mod variable; -/// Converts arrays into a row-oriented format that are [normalized for sorting]. +/// Converts [`ArrayRef`] columns into a row-oriented format that are [normalized for sorting]. /// /// In particular, a byte-wise comparison of the rows, e.g. [`memcmp`], is sufficient /// to establish the ordering of two rows, allowing for extremely fast comparisons, @@ -133,42 +134,74 @@ mod variable; /// [byte stuffing]:[https://en.wikipedia.org/wiki/High-Level_Data_Link_Control#Asynchronous_framing] #[derive(Debug)] pub struct RowConverter { - /// Sort options for column `i` - options: Vec, + fields: Vec, /// interning state for column `i`, if column`i` is a dictionary - dictionaries: Vec>>, + interners: Vec>>, +} + +/// Configure the data type and sort order for a given column +#[derive(Debug, Clone)] +pub struct SortField { + /// Sort options + options: SortOptions, + /// Data type + data_type: DataType, +} + +impl SortField { + /// Create a new column with the given data type + pub fn new(data_type: DataType) -> Self { + Self::new_with_options(data_type, Default::default()) + } + + /// Create a new column with the given data type and [`SortOptions`] + pub fn new_with_options(data_type: DataType, options: SortOptions) -> Self { + Self { options, data_type } + } } impl RowConverter { - /// Create a new [`RowConverter`] with the following options - pub fn new(options: Vec) -> Self { - let dictionaries = (0..options.len()).map(|_| None).collect(); - Self { - dictionaries, - options, - } + /// Create a new [`RowConverter`] with the provided schema + pub fn new(fields: Vec) -> Self { + let interners = (0..fields.len()).map(|_| None).collect(); + Self { fields, interners } } - /// Convert [`ArrayRef`]s into [`Rows`] + /// Convert [`ArrayRef`] columns into [`Rows`] /// /// See [`Row`] for information on when [`Row`] can be compared /// /// # Panics /// - /// Panics if the schema of `cols` does not match that provided to [`RowConverter::new`] - pub fn convert(&mut self, arrays: &[ArrayRef]) -> Result { - assert_eq!(arrays.len(), self.options.len(), "column count mismatch"); + /// Panics if the schema of `columns` does not match that provided to [`RowConverter::new`] + pub fn convert_columns(&mut self, columns: &[ArrayRef]) -> Result { + if columns.len() != self.fields.len() { + return Err(ArrowError::InvalidArgumentError(format!( + "Incorrect number of arrays provided to RowConverter, expected {} got {}", + self.fields.len(), + columns.len() + ))); + } - let dictionaries = arrays + let dictionaries = columns .iter() - .zip(&mut self.dictionaries) - .map(|(array, dictionary)| { + .zip(&mut self.interners) + .zip(&self.fields) + .map(|((column, interner), field)| { + if !column.data_type().equals_datatype(&field.data_type) { + return Err(ArrowError::InvalidArgumentError(format!( + "RowConverter column schema mismatch, expected {} got {}", + field.data_type, + column.data_type() + ))); + } + let values = downcast_dictionary_array! { - array => array.values(), + column => column.values(), _ => return Ok(None) }; - let interner = dictionary.get_or_insert_with(Default::default); + let interner = interner.get_or_insert_with(Default::default); let mapping: Vec<_> = compute_dictionary_mapping(interner, values)? .into_iter() @@ -181,13 +214,13 @@ impl RowConverter { }) .collect::>>()?; - let mut rows = new_empty_rows(arrays, &dictionaries)?; + let mut rows = new_empty_rows(columns, &dictionaries)?; - for ((array, options), dictionary) in - arrays.iter().zip(&self.options).zip(dictionaries) + for ((column, field), dictionary) in + columns.iter().zip(&self.fields).zip(dictionaries) { // We encode a column at a time to minimise dispatch overheads - encode_column(&mut rows, array, *options, dictionary.as_deref()) + encode_column(&mut rows, column, field.options, dictionary.as_deref()) } if cfg!(debug_assertions) { @@ -274,6 +307,8 @@ fn new_empty_rows( cols: &[ArrayRef], dictionaries: &[Option>>], ) -> Result { + use fixed::FixedLengthEncoding; + let num_rows = cols.first().map(|x| x.len()).unwrap_or(0); let mut lengths = vec![0; num_rows]; @@ -281,9 +316,9 @@ fn new_empty_rows( downcast_primitive_array! { array => lengths.iter_mut().for_each(|x| *x += fixed::encoded_len(array)), DataType::Null => lengths.iter_mut().for_each(|x| *x += 1), - DataType::Boolean => lengths.iter_mut().for_each(|x| *x += 2), - DataType::Decimal128(_, _) => lengths.iter_mut().for_each(|x| *x += 17), - DataType::Decimal256(_, _) => lengths.iter_mut().for_each(|x| *x += 33), + DataType::Boolean => lengths.iter_mut().for_each(|x| *x += bool::ENCODED_LEN), + DataType::Decimal128(_, _) => lengths.iter_mut().for_each(|x| *x += Decimal128::ENCODED_LEN), + DataType::Decimal256(_, _) => lengths.iter_mut().for_each(|x| *x += Decimal256::ENCODED_LEN), DataType::Binary => as_generic_binary_array::(array) .iter() .zip(lengths.iter_mut()) @@ -355,48 +390,48 @@ fn new_empty_rows( /// Encodes a column to the provided [`Rows`] incrementing the offsets as it progresses fn encode_column( out: &mut Rows, - array: &ArrayRef, + column: &ArrayRef, opts: SortOptions, dictionary: Option<&[Option<&[u8]>]>, ) { downcast_primitive_array! { - array => fixed::encode(out, array, opts), + column => fixed::encode(out, column, opts), DataType::Null => { - fixed::encode(out, std::iter::repeat(None::).take(array.len()), opts) + fixed::encode(out, std::iter::repeat(None::).take(column.len()), opts) } - DataType::Boolean => fixed::encode(out, as_boolean_array(array), opts), + DataType::Boolean => fixed::encode(out, as_boolean_array(column), opts), DataType::Decimal128(_, _) => fixed::encode( out, - array.as_any().downcast_ref::().unwrap(), + column.as_any().downcast_ref::().unwrap(), opts, ), DataType::Decimal256(_, _) => fixed::encode( out, - array.as_any().downcast_ref::().unwrap(), + column.as_any().downcast_ref::().unwrap(), opts, ), DataType::Binary => { - variable::encode(out, as_generic_binary_array::(array).iter(), opts) + variable::encode(out, as_generic_binary_array::(column).iter(), opts) } DataType::LargeBinary => { - variable::encode(out, as_generic_binary_array::(array).iter(), opts) + variable::encode(out, as_generic_binary_array::(column).iter(), opts) } DataType::Utf8 => variable::encode( out, - as_string_array(array).iter().map(|x| x.map(|x| x.as_bytes())), + as_string_array(column).iter().map(|x| x.map(|x| x.as_bytes())), opts, ), DataType::LargeUtf8 => variable::encode( out, - as_largestring_array(array) + as_largestring_array(column) .iter() .map(|x| x.map(|x| x.as_bytes())), opts, ), DataType::Dictionary(_, _) => downcast_dictionary_array! { - array => { + column => { let dict = dictionary.unwrap(); - for (offset, k) in out.offsets.iter_mut().skip(1).zip(array.keys()) { + for (offset, k) in out.offsets.iter_mut().skip(1).zip(column.keys()) { match k.and_then(|k| dict[k as usize]) { Some(v) => { let end_offset = *offset + 1 + v.len(); @@ -460,9 +495,11 @@ mod tests { ])) as ArrayRef, ]; - let mut converter = - RowConverter::new(vec![Default::default(), Default::default()]); - let rows = converter.convert(&cols).unwrap(); + let mut converter = RowConverter::new(vec![ + SortField::new(DataType::Int16), + SortField::new(DataType::Float32), + ]); + let rows = converter.convert_columns(&cols).unwrap(); assert_eq!(rows.offsets.as_ref(), &[0, 8, 16, 24, 32, 40, 48, 56]); assert_eq!( @@ -494,21 +531,24 @@ mod tests { #[test] fn test_bool() { - let mut converter = RowConverter::new(vec![Default::default()]); + let mut converter = RowConverter::new(vec![SortField::new(DataType::Boolean)]); let col = Arc::new(BooleanArray::from_iter([None, Some(false), Some(true)])); - let rows = converter.convert(&[col]).unwrap(); + let rows = converter.convert_columns(&[col]).unwrap(); assert!(rows.row(2) > rows.row(1)); assert!(rows.row(2) > rows.row(0)); assert!(rows.row(1) > rows.row(0)); - let mut converter = RowConverter::new(vec![SortOptions { - descending: true, - nulls_first: false, - }]); + let mut converter = RowConverter::new(vec![SortField::new_with_options( + DataType::Boolean, + SortOptions { + descending: true, + nulls_first: false, + }, + )]); let col = Arc::new(BooleanArray::from_iter([None, Some(false), Some(true)])); - let rows = converter.convert(&[col]).unwrap(); + let rows = converter.convert_columns(&[col]).unwrap(); assert!(rows.row(2) < rows.row(1)); assert!(rows.row(2) < rows.row(0)); assert!(rows.row(1) < rows.row(0)); @@ -524,8 +564,8 @@ mod tests { Some(""), ])); - let mut converter = RowConverter::new(vec![(Default::default())]); - let rows = converter.convert(&[col]).unwrap(); + let mut converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]); + let rows = converter.convert_columns(&[col]).unwrap(); assert!(rows.row(1) < rows.row(0)); assert!(rows.row(2) < rows.row(4)); @@ -546,8 +586,8 @@ mod tests { Some(vec![0xFF_u8; variable::BLOCK_SIZE + 1]), ])) as ArrayRef; - let mut converter = RowConverter::new(vec![Default::default()]); - let rows = converter.convert(&[Arc::clone(&col)]).unwrap(); + let mut converter = RowConverter::new(vec![SortField::new(DataType::Binary)]); + let rows = converter.convert_columns(&[Arc::clone(&col)]).unwrap(); for i in 0..rows.num_rows() { for j in i + 1..rows.num_rows() { @@ -562,11 +602,14 @@ mod tests { } } - let mut converter = RowConverter::new(vec![SortOptions { - descending: true, - nulls_first: false, - }]); - let rows = converter.convert(&[col]).unwrap(); + let mut converter = RowConverter::new(vec![SortField::new_with_options( + DataType::Binary, + SortOptions { + descending: true, + nulls_first: false, + }, + )]); + let rows = converter.convert_columns(&[col]).unwrap(); for i in 0..rows.num_rows() { for j in i + 1..rows.num_rows() { @@ -584,8 +627,6 @@ mod tests { #[test] fn test_string_dictionary() { - let mut converter = RowConverter::new(vec![Default::default()]); - let a = Arc::new(DictionaryArray::::from_iter([ Some("foo"), Some("hello"), @@ -597,7 +638,9 @@ mod tests { Some("hello"), ])) as ArrayRef; - let rows_a = converter.convert(&[Arc::clone(&a)]).unwrap(); + let mut converter = + RowConverter::new(vec![SortField::new(a.data_type().clone())]); + let rows_a = converter.convert_columns(&[Arc::clone(&a)]).unwrap(); assert!(rows_a.row(3) < rows_a.row(5)); assert!(rows_a.row(2) < rows_a.row(1)); @@ -614,17 +657,20 @@ mod tests { Some("cupcakes"), ])); - let rows_b = converter.convert(&[b]).unwrap(); + let rows_b = converter.convert_columns(&[b]).unwrap(); assert_eq!(rows_a.row(1), rows_b.row(0)); assert_eq!(rows_a.row(3), rows_b.row(1)); assert!(rows_b.row(2) < rows_a.row(0)); - let mut converter = RowConverter::new(vec![SortOptions { - descending: true, - nulls_first: false, - }]); + let mut converter = RowConverter::new(vec![SortField::new_with_options( + a.data_type().clone(), + SortOptions { + descending: true, + nulls_first: false, + }, + )]); - let rows_c = converter.convert(&[a]).unwrap(); + let rows_c = converter.convert_columns(&[a]).unwrap(); assert!(rows_c.row(3) > rows_c.row(5)); assert!(rows_c.row(2) > rows_c.row(1)); assert!(rows_c.row(0) > rows_c.row(1)); @@ -633,8 +679,6 @@ mod tests { #[test] fn test_primitive_dictionary() { - let mut converter = RowConverter::new(vec![Default::default()]); - let mut builder = PrimitiveDictionaryBuilder::::new(); builder.append(2).unwrap(); builder.append(3).unwrap(); @@ -644,7 +688,11 @@ mod tests { builder.append(3).unwrap(); builder.append(-1).unwrap(); - let rows = converter.convert(&[Arc::new(builder.finish())]).unwrap(); + let a = builder.finish(); + + let mut converter = + RowConverter::new(vec![SortField::new(a.data_type().clone())]); + let rows = converter.convert_columns(&[Arc::new(a)]).unwrap(); assert!(rows.row(0) < rows.row(1)); assert!(rows.row(2) < rows.row(0)); assert!(rows.row(3) < rows.row(2)); @@ -654,26 +702,24 @@ mod tests { #[test] fn test_dictionary_nulls() { - let mut converter = RowConverter::new(vec![Default::default()]); - let values = Int32Array::from_iter([Some(1), Some(-1), None, Some(4), None]).into_data(); let keys = Int32Array::from_iter([Some(0), Some(0), Some(1), Some(2), Some(4), None]) .into_data(); + let data_type = + DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Int32)); let data = keys .into_builder() - .data_type(DataType::Dictionary( - Box::new(DataType::Int32), - Box::new(DataType::Int32), - )) + .data_type(data_type.clone()) .child_data(vec![values]) .build() .unwrap(); + let mut converter = RowConverter::new(vec![SortField::new(data_type)]); let rows = converter - .convert(&[Arc::new(DictionaryArray::::from(data))]) + .convert_columns(&[Arc::new(DictionaryArray::::from(data))]) .unwrap(); assert_eq!(rows.row(0), rows.row(1)); @@ -796,8 +842,7 @@ mod tests { let mut rng = thread_rng(); let num_columns = rng.gen_range(1..5); let len = rng.gen_range(5..100); - let columns: Vec<_> = - (0..num_columns).map(|_| generate_column(len)).collect(); + let arrays: Vec<_> = (0..num_columns).map(|_| generate_column(len)).collect(); let options: Vec<_> = (0..num_columns) .map(|_| SortOptions { @@ -808,7 +853,7 @@ mod tests { let sort_columns: Vec<_> = options .iter() - .zip(&columns) + .zip(&arrays) .map(|(o, c)| SortColumn { values: Arc::clone(c), options: Some(*o), @@ -817,8 +862,14 @@ mod tests { let comparator = LexicographicalComparator::try_new(&sort_columns).unwrap(); - let mut converter = RowConverter::new(options); - let rows = converter.convert(&columns).unwrap(); + let columns = options + .into_iter() + .zip(&arrays) + .map(|(o, a)| SortField::new_with_options(a.data_type().clone(), o)) + .collect(); + + let mut converter = RowConverter::new(columns); + let rows = converter.convert_columns(&arrays).unwrap(); for i in 0..len { for j in 0..len { diff --git a/arrow/src/row/variable.rs b/arrow/src/row/variable.rs index 35b4e27d305..2213dad9e78 100644 --- a/arrow/src/row/variable.rs +++ b/arrow/src/row/variable.rs @@ -60,7 +60,7 @@ pub fn encode<'a, I: Iterator>>( let end_offset = *offset + 1 + block_count * (BLOCK_SIZE + 1); let to_write = &mut out.buffer[*offset..end_offset]; - // Write validity byte to demarcate as non-empty string + // Write `2_u8` to demarcate as non-empty, non-null string to_write[0] = 2; let chunks = val.chunks_exact(BLOCK_SIZE); From 2b0552fe4538a0656d55f06d31475024af0ecdfd Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Sat, 10 Sep 2022 20:38:35 +0100 Subject: [PATCH 8/8] Fix docs --- arrow/src/row/mod.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/arrow/src/row/mod.rs b/arrow/src/row/mod.rs index 5b51632a882..88c8a916663 100644 --- a/arrow/src/row/mod.rs +++ b/arrow/src/row/mod.rs @@ -260,8 +260,7 @@ impl Rows { /// A comparable representation of a row /// /// Two [`Row`] can be compared if they both belong to [`Rows`] returned by calls to -/// [`RowConvert::converter`] on the same [`RowConverter`], with the same number of arrays, -/// with the data types of each array index remaining consistent. +/// [`RowConverter::convert_columns`] on the same [`RowConverter`] /// /// Otherwise any ordering established by comparing the [`Row`] is arbitrary #[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]