diff --git a/.github/workflows/arrow.yml b/.github/workflows/arrow.yml index 458e0e0a149..ee1450c9c59 100644 --- a/.github/workflows/arrow.yml +++ b/.github/workflows/arrow.yml @@ -35,6 +35,7 @@ on: - arrow-ipc/** - arrow-json/** - arrow-ord/** + - arrow-row/** - arrow-schema/** - arrow-select/** - arrow-string/** @@ -76,6 +77,8 @@ jobs: run: cargo test -p arrow-string --all-features - name: Test arrow-ord with all features except SIMD run: cargo test -p arrow-ord --features dyn_cmp_dict + - name: Test arrow-row with all features + run: cargo test -p arrow-row --all-reatures - name: Test arrow-integration-test with all features run: cargo test -p arrow-integration-test --all-features - name: Test arrow with default features @@ -196,5 +199,7 @@ jobs: run: cargo clippy -p arrow-string --all-targets --all-features -- -D warnings - name: Clippy arrow-ord with all features except SIMD run: cargo clippy -p arrow-ord --all-targets --features dyn_cmp_dict -- -D warnings + - name: Clippy arrow-ord with all features + run: cargo clippy -p arrow-ord --all-targets --all-features -- -D warnings - name: Clippy arrow run: cargo clippy -p arrow --features=prettyprint,csv,ipc,test_utils,ffi,ipc_compression,dyn_cmp_dict,dyn_arith_dict,chrono-tz --all-targets -- -D warnings diff --git a/.github/workflows/integration.yml b/.github/workflows/integration.yml index 526106bfe7c..0975c11d52f 100644 --- a/.github/workflows/integration.yml +++ b/.github/workflows/integration.yml @@ -33,11 +33,12 @@ on: - arrow-integration-test/** - arrow-integration-testing/** - arrow-ipc/** - - arrow-ord/** - arrow-json/** + - arrow-ord/** - arrow-pyarrow-integration-testing/** - arrow-schema/** - arrow-select/** + - arrow-sort/** - arrow-string/** - arrow/** diff --git a/Cargo.toml b/Cargo.toml index c123106c6f7..fb072f7d346 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -29,6 +29,7 @@ members = [ "arrow-ipc", "arrow-json", "arrow-ord", + "arrow-row", "arrow-schema", "arrow-select", "arrow-string", diff --git a/arrow-array/src/lib.rs b/arrow-array/src/lib.rs index d6a9ab30b85..fe07ced1327 100644 --- a/arrow-array/src/lib.rs +++ b/arrow-array/src/lib.rs @@ -183,6 +183,25 @@ pub mod timezone; mod trusted_len; pub mod types; +/// Options that define how sort kernels should behave +#[derive(Clone, Copy, Debug, Eq, PartialEq)] +pub struct SortOptions { + /// Whether to sort in descending order + pub descending: bool, + /// Whether to sort nulls first + pub nulls_first: bool, +} + +impl Default for SortOptions { + fn default() -> Self { + Self { + descending: false, + // default to nulls first to match spark's behavior + nulls_first: true, + } + } +} + #[cfg(test)] mod tests { use crate::builder::*; diff --git a/arrow-ord/src/sort.rs b/arrow-ord/src/sort.rs index a2035988fe2..0028655d8f6 100644 --- a/arrow-ord/src/sort.rs +++ b/arrow-ord/src/sort.rs @@ -27,6 +27,8 @@ use arrow_schema::{ArrowError, DataType, IntervalUnit, TimeUnit}; use arrow_select::take::take; use std::cmp::Ordering; +pub use arrow_array::SortOptions; + /// Sort the `ArrayRef` using `SortOptions`. /// /// Performs a sort on values and indices. Nulls are ordered according @@ -366,25 +368,6 @@ pub fn sort_to_indices( }) } -/// Options that define how sort kernels should behave -#[derive(Clone, Copy, Debug, Eq, PartialEq)] -pub struct SortOptions { - /// Whether to sort in descending order - pub descending: bool, - /// Whether to sort nulls first - pub nulls_first: bool, -} - -impl Default for SortOptions { - fn default() -> Self { - Self { - descending: false, - // default to nulls first to match spark's behavior - nulls_first: true, - } - } -} - /// Sort boolean values /// /// when a limit is present, the sort is pair-comparison based as k-select might be more efficient, diff --git a/arrow-row/Cargo.toml b/arrow-row/Cargo.toml new file mode 100644 index 00000000000..4741c9d5840 --- /dev/null +++ b/arrow-row/Cargo.toml @@ -0,0 +1,61 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +[package] +name = "arrow-row" +version = "29.0.0" +description = "Arrow row format" +homepage = "https://github.com/apache/arrow-rs" +repository = "https://github.com/apache/arrow-rs" +authors = ["Apache Arrow "] +license = "Apache-2.0" +keywords = ["arrow"] +include = [ + "benches/*.rs", + "src/**/*.rs", + "Cargo.toml", +] +edition = "2021" +rust-version = "1.62" + +[lib] +name = "arrow_row" +path = "src/lib.rs" +bench = false + +[target.'cfg(target_arch = "wasm32")'.dependencies] +ahash = { version = "0.8", default-features = false, features = ["compile-time-rng"] } + +[target.'cfg(not(target_arch = "wasm32"))'.dependencies] +ahash = { version = "0.8", default-features = false, features = ["runtime-rng"] } + +[dependencies] +arrow-array = { version = "29.0.0", path = "../arrow-array" } +arrow-buffer = { version = "29.0.0", path = "../arrow-buffer" } +arrow-data = { version = "29.0.0", path = "../arrow-data" } +arrow-schema = { version = "29.0.0", path = "../arrow-schema" } + +half = { version = "2.1", default-features = false } +hashbrown = { version = "0.13", default-features = false } + +[dev-dependencies] +arrow-cast = { version = "29.0.0", path = "../arrow-cast" } +arrow-ord = { version = "29.0.0", path = "../arrow-ord" } +rand = { version = "0.8", default-features = false, features = ["std", "std_rng"] } + +[features] + diff --git a/arrow/src/row/dictionary.rs b/arrow-row/src/dictionary.rs similarity index 97% rename from arrow/src/row/dictionary.rs rename to arrow-row/src/dictionary.rs index 82169a37d35..0768665a811 100644 --- a/arrow/src/row/dictionary.rs +++ b/arrow-row/src/dictionary.rs @@ -15,10 +15,9 @@ // specific language governing permissions and limitations // under the License. -use crate::compute::SortOptions; -use crate::row::fixed::{FixedLengthEncoding, FromSlice}; -use crate::row::interner::{Interned, OrderPreservingInterner}; -use crate::row::{null_sentinel, Rows}; +use crate::fixed::{FixedLengthEncoding, FromSlice}; +use crate::interner::{Interned, OrderPreservingInterner}; +use crate::{null_sentinel, Rows}; use arrow_array::builder::*; use arrow_array::cast::*; use arrow_array::types::*; diff --git a/arrow/src/row/fixed.rs b/arrow-row/src/fixed.rs similarity index 98% rename from arrow/src/row/fixed.rs rename to arrow-row/src/fixed.rs index 03c53c99479..0802ca2c66f 100644 --- a/arrow/src/row/fixed.rs +++ b/arrow-row/src/fixed.rs @@ -16,11 +16,9 @@ // under the License. use crate::array::PrimitiveArray; -use crate::compute::SortOptions; -use crate::datatypes::ArrowPrimitiveType; -use crate::row::{null_sentinel, Rows}; +use crate::{null_sentinel, Rows}; use arrow_array::builder::BufferBuilder; -use arrow_array::{BooleanArray, FixedSizeBinaryArray}; +use arrow_array::{ArrowPrimitiveType, BooleanArray, FixedSizeBinaryArray, SortOptions}; use arrow_buffer::{bit_util, i256, ArrowNativeType, Buffer, MutableBuffer}; use arrow_data::{ArrayData, ArrayDataBuilder}; use arrow_schema::DataType; diff --git a/arrow/src/row/interner.rs b/arrow-row/src/interner.rs similarity index 100% rename from arrow/src/row/interner.rs rename to arrow-row/src/interner.rs diff --git a/arrow-row/src/lib.rs b/arrow-row/src/lib.rs new file mode 100644 index 00000000000..23dd7fe482c --- /dev/null +++ b/arrow-row/src/lib.rs @@ -0,0 +1,2170 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! A comparable row-oriented representation of a collection of [`Array`]. +//! +//! [`Row`]s are [normalized for sorting], and can therefore be very efficiently [compared], +//! using [`memcmp`] under the hood, or used in [non-comparison sorts] such as [radix sort]. +//! This makes the row format ideal for implementing efficient multi-column sorting, +//! grouping, aggregation, windowing and more, as described in more detail +//! [here](https://arrow.apache.org/blog/2022/11/07/multi-column-sorts-in-arrow-rust-part-1/). +//! +//! For example, given three input [`Array`], [`RowConverter`] creates byte +//! sequences that [compare] the same as when using [`lexsort`]. +//! +//! ```text +//! ┌─────┐ ┌─────┐ ┌─────┐ +//! │ │ │ │ │ │ +//! ├─────┤ ┌ ┼─────┼ ─ ┼─────┼ ┐ ┏━━━━━━━━━━━━━┓ +//! │ │ │ │ │ │ ─────────────▶┃ ┃ +//! ├─────┤ └ ┼─────┼ ─ ┼─────┼ ┘ ┗━━━━━━━━━━━━━┛ +//! │ │ │ │ │ │ +//! └─────┘ └─────┘ └─────┘ +//! ... +//! ┌─────┐ ┌ ┬─────┬ ─ ┬─────┬ ┐ ┏━━━━━━━━┓ +//! │ │ │ │ │ │ ─────────────▶┃ ┃ +//! └─────┘ └ ┴─────┴ ─ ┴─────┴ ┘ ┗━━━━━━━━┛ +//! UInt64 Utf8 F64 +//! +//! Input Arrays Row Format +//! (Columns) +//! ``` +//! +//! _[`Rows`] must be generated by the same [`RowConverter`] for the comparison +//! to be meaningful._ +//! +//! # Basic Example +//! ``` +//! # use std::sync::Arc; +//! # use arrow_row::{RowConverter, SortField}; +//! # use arrow_array::{ArrayRef, Int32Array, StringArray}; +//! # use arrow_array::cast::{as_primitive_array, as_string_array}; +//! # use arrow_array::types::Int32Type; +//! # use arrow_schema::DataType; +//! +//! let a1 = Arc::new(Int32Array::from_iter_values([-1, -1, 0, 3, 3])) as ArrayRef; +//! let a2 = Arc::new(StringArray::from_iter_values(["a", "b", "c", "d", "d"])) as ArrayRef; +//! let arrays = vec![a1, a2]; +//! +//! // Convert arrays to rows +//! let mut converter = RowConverter::new(vec![ +//! SortField::new(DataType::Int32), +//! SortField::new(DataType::Utf8), +//! ]).unwrap(); +//! let rows = converter.convert_columns(&arrays).unwrap(); +//! +//! // Compare rows +//! for i in 0..4 { +//! assert!(rows.row(i) <= rows.row(i + 1)); +//! } +//! assert_eq!(rows.row(3), rows.row(4)); +//! +//! // Convert rows back to arrays +//! let converted = converter.convert_rows(&rows).unwrap(); +//! assert_eq!(arrays, converted); +//! +//! // Compare rows from different arrays +//! let a1 = Arc::new(Int32Array::from_iter_values([3, 4])) as ArrayRef; +//! let a2 = Arc::new(StringArray::from_iter_values(["e", "f"])) as ArrayRef; +//! let arrays = vec![a1, a2]; +//! let rows2 = converter.convert_columns(&arrays).unwrap(); +//! +//! assert!(rows.row(4) < rows2.row(0)); +//! assert!(rows.row(4) < rows2.row(1)); +//! +//! // Convert selection of rows back to arrays +//! let selection = [rows.row(0), rows2.row(1), rows.row(2), rows2.row(0)]; +//! let converted = converter.convert_rows(selection).unwrap(); +//! let c1 = as_primitive_array::(converted[0].as_ref()); +//! assert_eq!(c1.values(), &[-1, 4, 0, 3]); +//! +//! let c2 = as_string_array(converted[1].as_ref()); +//! let c2_values: Vec<_> = c2.iter().flatten().collect(); +//! assert_eq!(&c2_values, &["a", "f", "c", "e"]); +//! ``` +//! +//! # Lexsort +//! +//! The row format can also be used to implement a fast multi-column / lexicographic sort +//! +//! ``` +//! # use arrow_row::{RowConverter, SortField}; +//! # use arrow_array::{ArrayRef, UInt32Array}; +//! fn lexsort_to_indices(arrays: &[ArrayRef]) -> UInt32Array { +//! let fields = arrays +//! .iter() +//! .map(|a| SortField::new(a.data_type().clone())) +//! .collect(); +//! let mut converter = RowConverter::new(fields).unwrap(); +//! let rows = converter.convert_columns(&arrays).unwrap(); +//! let mut sort: Vec<_> = rows.iter().enumerate().collect(); +//! sort.sort_unstable_by(|(_, a), (_, b)| a.cmp(b)); +//! UInt32Array::from_iter_values(sort.iter().map(|(i, _)| *i as u32)) +//! } +//! ``` +//! +//! [non-comparison sorts]:[https://en.wikipedia.org/wiki/Sorting_algorithm#Non-comparison_sorts] +//! [radix sort]:[https://en.wikipedia.org/wiki/Radix_sort] +//! [normalized for sorting]:[https://citeseerx.ist.psu.edu/viewdoc/download?doi=10.1.1.83.1080&rep=rep1&type=pdf] +//! [`memcmp`]:[https://www.man7.org/linux/man-pages/man3/memcmp.3.html] +//! [`lexsort`]: crate::compute::kernels::sort::lexsort +//! [compared]: PartialOrd +//! [compare]: PartialOrd + +use std::cmp::Ordering; +use std::hash::{Hash, Hasher}; +use std::sync::Arc; + +use arrow_array::cast::*; +use arrow_array::*; +use arrow_buffer::ArrowNativeType; +use arrow_data::ArrayDataBuilder; +use arrow_schema::*; + +use crate::dictionary::{ + compute_dictionary_mapping, decode_dictionary, encode_dictionary, +}; +use crate::fixed::{decode_bool, decode_fixed_size_binary, decode_primitive}; +use crate::interner::OrderPreservingInterner; +use crate::variable::{decode_binary, decode_string}; + +mod dictionary; +mod fixed; +mod interner; +mod list; +mod variable; + +/// Converts [`ArrayRef`] columns into a [row-oriented](self) format. +/// +/// *Note: The encoding of the row format may change from release to release.* +/// +/// ## Overview +/// +/// The row format is a variable length byte sequence created by +/// concatenating the encoded form of each column. The encoding for +/// each column depends on its datatype (and sort options). +/// +/// The encoding is carefully designed in such a way that escaping is +/// unnecessary: it is never ambiguous as to whether a byte is part of +/// a sentinel (e.g. null) or a value. +/// +/// ## Unsigned Integer Encoding +/// +/// A null integer is encoded as a `0_u8`, followed by a zero-ed number of bytes corresponding +/// to the integer's length. +/// +/// A valid integer is encoded as `1_u8`, followed by the big-endian representation of the +/// integer. +/// +/// ```text +/// ┌──┬──┬──┬──┐ ┌──┬──┬──┬──┬──┐ +/// 3 │03│00│00│00│ │01│00│00│00│03│ +/// └──┴──┴──┴──┘ └──┴──┴──┴──┴──┘ +/// ┌──┬──┬──┬──┐ ┌──┬──┬──┬──┬──┐ +/// 258 │02│01│00│00│ │01│00│00│01│02│ +/// └──┴──┴──┴──┘ └──┴──┴──┴──┴──┘ +/// ┌──┬──┬──┬──┐ ┌──┬──┬──┬──┬──┐ +/// 23423 │7F│5B│00│00│ │01│00│00│5B│7F│ +/// └──┴──┴──┴──┘ └──┴──┴──┴──┴──┘ +/// ┌──┬──┬──┬──┐ ┌──┬──┬──┬──┬──┐ +/// NULL │??│??│??│??│ │00│00│00│00│00│ +/// └──┴──┴──┴──┘ └──┴──┴──┴──┴──┘ +/// +/// 32-bit (4 bytes) Row Format +/// Value Little Endian +/// ``` +/// +/// ## Signed Integer Encoding +/// +/// Signed integers have their most significant sign bit flipped, and are then encoded in the +/// same manner as an unsigned integer. +/// +/// ```text +/// ┌──┬──┬──┬──┐ ┌──┬──┬──┬──┐ ┌──┬──┬──┬──┬──┐ +/// 5 │05│00│00│00│ │05│00│00│80│ │01│80│00│00│05│ +/// └──┴──┴──┴──┘ └──┴──┴──┴──┘ └──┴──┴──┴──┴──┘ +/// ┌──┬──┬──┬──┐ ┌──┬──┬──┬──┐ ┌──┬──┬──┬──┬──┐ +/// -5 │FB│FF│FF│FF│ │FB│FF│FF│7F│ │01│7F│FF│FF│FB│ +/// └──┴──┴──┴──┘ └──┴──┴──┴──┘ └──┴──┴──┴──┴──┘ +/// +/// Value 32-bit (4 bytes) High bit flipped Row Format +/// Little Endian +/// ``` +/// +/// ## Float Encoding +/// +/// Floats are converted from IEEE 754 representation to a signed integer representation +/// by flipping all bar the sign bit if they are negative. +/// +/// They are then encoded in the same manner as a signed integer. +/// +/// ## Fixed Length Bytes Encoding +/// +/// Fixed length bytes are encoded in the same fashion as primitive types above. +/// +/// For a fixed length array of length `n`: +/// +/// A null is encoded as `0_u8` null sentinel followed by `n` `0_u8` bytes +/// +/// A valid value is encoded as `1_u8` followed by the value bytes +/// +/// ## Variable Length Bytes (including Strings) Encoding +/// +/// A null is encoded as a `0_u8`. +/// +/// An empty byte array is encoded as `1_u8`. +/// +/// A non-null, non-empty byte array is encoded as `2_u8` followed by the byte array +/// encoded using a block based scheme described below. +/// +/// The byte array is broken up into 32-byte blocks, each block is written in turn +/// to the output, followed by `0xFF_u8`. The final block is padded to 32-bytes +/// with `0_u8` and written to the output, followed by the un-padded length in bytes +/// of this final block as a `u8`. +/// +/// Note the following example encodings use a block size of 4 bytes, +/// as opposed to 32 bytes for brevity: +/// +/// ```text +/// ┌───┬───┬───┬───┬───┬───┐ +/// "MEEP" │02 │'M'│'E'│'E'│'P'│04 │ +/// └───┴───┴───┴───┴───┴───┘ +/// +/// ┌───┐ +/// "" │01 | +/// └───┘ +/// +/// NULL ┌───┐ +/// │00 │ +/// └───┘ +/// +/// "Defenestration" ┌───┬───┬───┬───┬───┬───┐ +/// │02 │'D'│'e'│'f'│'e'│FF │ +/// └───┼───┼───┼───┼───┼───┤ +/// │'n'│'e'│'s'│'t'│FF │ +/// ├───┼───┼───┼───┼───┤ +/// │'r'│'a'│'t'│'r'│FF │ +/// ├───┼───┼───┼───┼───┤ +/// │'a'│'t'│'i'│'o'│FF │ +/// ├───┼───┼───┼───┼───┤ +/// │'n'│00 │00 │00 │01 │ +/// └───┴───┴───┴───┴───┘ +/// ``` +/// +/// This approach is loosely inspired by [COBS] encoding, and chosen over more traditional +/// [byte stuffing] as it is more amenable to vectorisation, in particular AVX-256. +/// +/// ## Dictionary Encoding +/// +/// [`RowConverter`] needs to support converting dictionary encoded arrays with unsorted, and +/// potentially distinct dictionaries. One simple mechanism to avoid this would be to reverse +/// the dictionary encoding, and encode the array values directly, however, this would lose +/// the benefits of dictionary encoding to reduce memory and CPU consumption. +/// +/// As such the [`RowConverter`] creates an order-preserving mapping +/// for each dictionary encoded column, which allows new dictionary +/// values to be added whilst preserving the sort order. +/// +/// A null dictionary value is encoded as `0_u8`. +/// +/// A non-null dictionary value is encoded as `1_u8` followed by a null-terminated byte array +/// key determined by the order-preserving dictionary encoding +/// +/// ```text +/// ┌──────────┐ ┌─────┐ +/// │ "Bar" │ ───────────────▶│ 01 │ +/// └──────────┘ └─────┘ +/// ┌──────────┐ ┌─────┬─────┐ +/// │"Fabulous"│ ───────────────▶│ 01 │ 02 │ +/// └──────────┘ └─────┴─────┘ +/// ┌──────────┐ ┌─────┐ +/// │ "Soup" │ ───────────────▶│ 05 │ +/// └──────────┘ └─────┘ +/// ┌──────────┐ ┌─────┐ +/// │ "ZZ" │ ───────────────▶│ 07 │ +/// └──────────┘ └─────┘ +/// +/// Example Order Preserving Mapping +/// ``` +/// Using the map above, the corresponding row format will be +/// +/// ```text +/// ┌─────┬─────┬─────┬─────┐ +/// "Fabulous" │ 01 │ 03 │ 05 │ 00 │ +/// └─────┴─────┴─────┴─────┘ +/// +/// ┌─────┬─────┬─────┐ +/// "ZZ" │ 01 │ 07 │ 00 │ +/// └─────┴─────┴─────┘ +/// +/// ┌─────┐ +/// NULL │ 00 │ +/// └─────┘ +/// +/// Input Row Format +/// ``` +/// +/// ## Struct Encoding +/// +/// A null is encoded as a `0_u8`. +/// +/// A valid value is encoded as `1_u8` followed by the row encoding of each child. +/// +/// This encoding effectively flattens the schema in a depth-first fashion. +/// +/// For example +/// +/// ```text +/// ┌───────┬────────────────────────┬───────┐ +/// │ Int32 │ Struct[Int32, Float32] │ Int32 │ +/// └───────┴────────────────────────┴───────┘ +/// ``` +/// +/// Is encoded as +/// +/// ```text +/// ┌───────┬───────────────┬───────┬─────────┬───────┐ +/// │ Int32 │ Null Sentinel │ Int32 │ Float32 │ Int32 │ +/// └───────┴───────────────┴───────┴─────────┴───────┘ +/// ``` +/// +/// ## List Encoding +/// +/// Lists are encoded by first encoding all child elements to the row format. +/// +/// A "canonical byte array" is then constructed by concatenating the row +/// encodings of all their elements into a single binary array, followed +/// by the lengths of each encoded row, and the number of elements, encoded +/// as big endian `u32`. +/// +/// This canonical byte array is then encoded using the variable length byte +/// encoding described above. +/// +/// _The lengths are not strictly necessary but greatly simplify decode, they +/// may be removed in a future iteration_. +/// +/// For example given: +/// +/// ```text +/// [1_u8, 2_u8, 3_u8] +/// [1_u8, null] +/// [] +/// null +/// ``` +/// +/// The elements would be converted to: +/// +/// ```text +/// ┌──┬──┐ ┌──┬──┐ ┌──┬──┐ ┌──┬──┐ ┌──┬──┐ +/// 1 │01│01│ 2 │01│02│ 3 │01│03│ 1 │01│01│ null │00│00│ +/// └──┴──┘ └──┴──┘ └──┴──┘ └──┴──┘ └──┴──┘ +///``` +/// +/// Which would be grouped into the following canonical byte arrays: +/// +/// ```text +/// ┌──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┐ +/// [1_u8, 2_u8, 3_u8] │01│01│01│02│01│03│00│00│00│02│00│00│00│02│00│00│00│02│00│00│00│03│ +/// └──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┘ +/// └──── rows ────┘ └───────── row lengths ─────────┘ └─ count ─┘ +/// +/// ┌──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┐ +/// [1_u8, null] │01│01│00│00│00│00│00│02│00│00│00│02│00│00│00│02│ +/// └──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┘ +///``` +/// +/// With `[]` represented by an empty byte array, and `null` a null byte array. +/// +/// These byte arrays will then be encoded using the variable length byte encoding +/// described above. +/// +/// # Ordering +/// +/// ## Float Ordering +/// +/// Floats are totally ordered in accordance to the `totalOrder` predicate as defined +/// in the IEEE 754 (2008 revision) floating point standard. +/// +/// The ordering established by this does not always agree with the +/// [`PartialOrd`] and [`PartialEq`] implementations of `f32`. For example, +/// they consider negative and positive zero equal, while this does not +/// +/// ## Null Ordering +/// +/// The encoding described above will order nulls first, this can be inverted by representing +/// nulls as `0xFF_u8` instead of `0_u8` +/// +/// ## Reverse Column Ordering +/// +/// The order of a given column can be reversed by negating the encoded bytes of non-null values +/// +/// [COBS]: https://en.wikipedia.org/wiki/Consistent_Overhead_Byte_Stuffing +/// [byte stuffing]: https://en.wikipedia.org/wiki/High-Level_Data_Link_Control#Asynchronous_framing +#[derive(Debug)] +pub struct RowConverter { + fields: Arc<[SortField]>, + /// State for codecs + codecs: Vec, +} + +#[derive(Debug)] +enum Codec { + /// No additional codec state is necessary + Stateless, + /// The interner used to encode dictionary values + Dictionary(OrderPreservingInterner), + /// A row converter for the child fields + /// and the encoding of a row containing only nulls + Struct(RowConverter, OwnedRow), + /// A row converter for the child field + List(RowConverter), +} + +impl Codec { + fn new(sort_field: &SortField) -> Result { + match &sort_field.data_type { + DataType::Dictionary(_, _) => Ok(Self::Dictionary(Default::default())), + d if !d.is_nested() => Ok(Self::Stateless), + DataType::List(f) | DataType::LargeList(f) => { + // The encoded contents will be inverted if descending is set to true + // As such we set `descending` to false and negate nulls first if it + // it set to true + let options = SortOptions { + descending: false, + nulls_first: sort_field.options.nulls_first + != sort_field.options.descending, + }; + + let field = SortField::new_with_options(f.data_type().clone(), options); + let converter = RowConverter::new(vec![field])?; + Ok(Self::List(converter)) + } + DataType::Struct(f) => { + let sort_fields = f + .iter() + .map(|x| { + SortField::new_with_options( + x.data_type().clone(), + sort_field.options, + ) + }) + .collect(); + + let mut converter = RowConverter::new(sort_fields)?; + let nulls: Vec<_> = + f.iter().map(|x| new_null_array(x.data_type(), 1)).collect(); + + let nulls = converter.convert_columns(&nulls)?; + let owned = OwnedRow { + data: nulls.buffer, + config: nulls.config, + }; + + Ok(Self::Struct(converter, owned)) + } + _ => Err(ArrowError::NotYetImplemented(format!( + "not yet implemented: {:?}", + sort_field.data_type + ))), + } + } + + fn encoder(&mut self, array: &dyn Array) -> Result, ArrowError> { + match self { + Codec::Stateless => Ok(Encoder::Stateless), + Codec::Dictionary(interner) => { + let values = downcast_dictionary_array! { + array => array.values(), + _ => unreachable!() + }; + + let mapping = compute_dictionary_mapping(interner, values) + .into_iter() + .map(|maybe_interned| { + maybe_interned.map(|interned| interner.normalized_key(interned)) + }) + .collect(); + + Ok(Encoder::Dictionary(mapping)) + } + Codec::Struct(converter, null) => { + let v = as_struct_array(array); + let rows = converter.convert_columns(v.columns())?; + Ok(Encoder::Struct(rows, null.row())) + } + Codec::List(converter) => { + let values = match array.data_type() { + DataType::List(_) => as_list_array(array).values(), + DataType::LargeList(_) => as_large_list_array(array).values(), + _ => unreachable!(), + }; + let rows = converter.convert_columns(&[values])?; + Ok(Encoder::List(rows)) + } + } + } + + fn size(&self) -> usize { + match self { + Codec::Stateless => 0, + Codec::Dictionary(interner) => interner.size(), + Codec::Struct(converter, nulls) => converter.size() + nulls.data.len(), + Codec::List(converter) => converter.size(), + } + } +} + +#[derive(Debug)] +enum Encoder<'a> { + /// No additional encoder state is necessary + Stateless, + /// The mapping from dictionary keys to normalized keys + Dictionary(Vec>), + /// The row encoding of the child arrays and the encoding of a null row + /// + /// It is necessary to encode to a temporary [`Rows`] to avoid serializing + /// values that are masked by a null in the parent StructArray, otherwise + /// this would establish an ordering between semantically null values + Struct(Rows, Row<'a>), + /// The row encoding of the child array + List(Rows), +} + +/// Configure the data type and sort order for a given column +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct SortField { + /// Sort options + options: SortOptions, + /// Data type + data_type: DataType, +} + +impl SortField { + /// Create a new column with the given data type + pub fn new(data_type: DataType) -> Self { + Self::new_with_options(data_type, Default::default()) + } + + /// Create a new column with the given data type and [`SortOptions`] + pub fn new_with_options(data_type: DataType, options: SortOptions) -> Self { + Self { options, data_type } + } + + /// Return size of this instance in bytes. + /// + /// Includes the size of `Self`. + pub fn size(&self) -> usize { + self.data_type.size() + std::mem::size_of::() + - std::mem::size_of::() + } +} + +impl RowConverter { + /// Create a new [`RowConverter`] with the provided schema + pub fn new(fields: Vec) -> Result { + if !Self::supports_fields(&fields) { + return Err(ArrowError::NotYetImplemented(format!( + "Row format support not yet implemented for: {:?}", + fields + ))); + } + + let codecs = fields.iter().map(Codec::new).collect::>()?; + Ok(Self { + fields: fields.into(), + codecs, + }) + } + + /// Check if the given fields are supported by the row format. + pub fn supports_fields(fields: &[SortField]) -> bool { + fields.iter().all(|x| Self::supports_datatype(&x.data_type)) + } + + fn supports_datatype(d: &DataType) -> bool { + match d { + _ if !d.is_nested() => true, + DataType::List(f) | DataType::LargeList(f) | DataType::Map(f, _) => { + Self::supports_datatype(f.data_type()) + } + DataType::Struct(f) => { + f.iter().all(|x| Self::supports_datatype(x.data_type())) + } + _ => false, + } + } + + /// Convert [`ArrayRef`] columns into [`Rows`] + /// + /// See [`Row`] for information on when [`Row`] can be compared + /// + /// # Panics + /// + /// Panics if the schema of `columns` does not match that provided to [`RowConverter::new`] + pub fn convert_columns(&mut self, columns: &[ArrayRef]) -> Result { + if columns.len() != self.fields.len() { + return Err(ArrowError::InvalidArgumentError(format!( + "Incorrect number of arrays provided to RowConverter, expected {} got {}", + self.fields.len(), + columns.len() + ))); + } + + let encoders = columns + .iter() + .zip(&mut self.codecs) + .zip(self.fields.iter()) + .map(|((column, codec), field)| { + if !column.data_type().equals_datatype(&field.data_type) { + return Err(ArrowError::InvalidArgumentError(format!( + "RowConverter column schema mismatch, expected {} got {}", + field.data_type, + column.data_type() + ))); + } + codec.encoder(column.as_ref()) + }) + .collect::, _>>()?; + + let config = RowConfig { + fields: Arc::clone(&self.fields), + // Don't need to validate UTF-8 as came from arrow array + validate_utf8: false, + }; + let mut rows = new_empty_rows(columns, &encoders, config); + + for ((column, field), encoder) in + columns.iter().zip(self.fields.iter()).zip(encoders) + { + // We encode a column at a time to minimise dispatch overheads + encode_column(&mut rows, column.as_ref(), field.options, &encoder) + } + + if cfg!(debug_assertions) { + assert_eq!(*rows.offsets.last().unwrap(), rows.buffer.len()); + rows.offsets + .windows(2) + .for_each(|w| assert!(w[0] <= w[1], "offsets should be monotonic")); + } + + Ok(rows) + } + + /// Convert [`Rows`] columns into [`ArrayRef`] + /// + /// # Panics + /// + /// Panics if the rows were not produced by this [`RowConverter`] + pub fn convert_rows<'a, I>(&self, rows: I) -> Result, ArrowError> + where + I: IntoIterator>, + { + let mut validate_utf8 = false; + let mut rows: Vec<_> = rows + .into_iter() + .map(|row| { + assert!( + Arc::ptr_eq(&row.config.fields, &self.fields), + "rows were not produced by this RowConverter" + ); + validate_utf8 |= row.config.validate_utf8; + row.data + }) + .collect(); + + // SAFETY + // We have validated that the rows came from this [`RowConverter`] + // and therefore must be valid + unsafe { self.convert_raw(&mut rows, validate_utf8) } + } + + /// Convert raw bytes into [`ArrayRef`] + /// + /// # Safety + /// + /// `rows` must contain valid data for this [`RowConverter`] + unsafe fn convert_raw( + &self, + rows: &mut [&[u8]], + validate_utf8: bool, + ) -> Result, ArrowError> { + self.fields + .iter() + .zip(&self.codecs) + .map(|(field, codec)| decode_column(field, rows, codec, validate_utf8)) + .collect() + } + + /// Returns a [`RowParser`] that can be used to parse [`Row`] from bytes + pub fn parser(&self) -> RowParser { + RowParser::new(Arc::clone(&self.fields)) + } + + /// Returns the size of this instance in bytes + /// + /// Includes the size of `Self`. + pub fn size(&self) -> usize { + std::mem::size_of::() + + self.fields.iter().map(|x| x.size()).sum::() + + self.codecs.capacity() * std::mem::size_of::() + + self.codecs.iter().map(Codec::size).sum::() + } +} + +/// A [`RowParser`] can be created from a [`RowConverter`] and used to parse bytes to [`Row`] +#[derive(Debug)] +pub struct RowParser { + config: RowConfig, +} + +impl RowParser { + fn new(fields: Arc<[SortField]>) -> Self { + Self { + config: RowConfig { + fields, + validate_utf8: true, + }, + } + } + + /// Creates a [`Row`] from the provided `bytes`. + /// + /// `bytes` must be a [`Row`] produced by the [`RowConverter`] associated with + /// this [`RowParser`], otherwise subsequent operations with the produced [`Row`] may panic + pub fn parse<'a>(&'a self, bytes: &'a [u8]) -> Row<'a> { + Row { + data: bytes, + config: &self.config, + } + } +} + +/// The config of a given set of [`Row`] +#[derive(Debug, Clone)] +struct RowConfig { + /// The schema for these rows + fields: Arc<[SortField]>, + /// Whether to run UTF-8 validation when converting to arrow arrays + validate_utf8: bool, +} + +/// A row-oriented representation of arrow data, that is normalized for comparison. +/// +/// See the [module level documentation](self) and [`RowConverter`] for more details. +#[derive(Debug)] +pub struct Rows { + /// Underlying row bytes + buffer: Box<[u8]>, + /// Row `i` has data `&buffer[offsets[i]..offsets[i+1]]` + offsets: Box<[usize]>, + /// The config for these rows + config: RowConfig, +} + +impl Rows { + pub fn row(&self, row: usize) -> Row<'_> { + let end = self.offsets[row + 1]; + let start = self.offsets[row]; + Row { + data: &self.buffer[start..end], + config: &self.config, + } + } + + pub fn num_rows(&self) -> usize { + self.offsets.len() - 1 + } + + pub fn iter(&self) -> RowsIter<'_> { + self.into_iter() + } + + /// Returns the size of this instance in bytes + /// + /// Includes the size of `Self`. + pub fn size(&self) -> usize { + // Size of fields is accounted for as part of RowConverter + std::mem::size_of::() + + self.buffer.len() + + self.offsets.len() * std::mem::size_of::() + } +} + +impl<'a> IntoIterator for &'a Rows { + type Item = Row<'a>; + type IntoIter = RowsIter<'a>; + + fn into_iter(self) -> Self::IntoIter { + RowsIter { + rows: self, + start: 0, + end: self.num_rows(), + } + } +} + +/// An iterator over [`Rows`] +#[derive(Debug)] +pub struct RowsIter<'a> { + rows: &'a Rows, + start: usize, + end: usize, +} + +impl<'a> Iterator for RowsIter<'a> { + type Item = Row<'a>; + + fn next(&mut self) -> Option { + if self.end == self.start { + return None; + } + let row = self.rows.row(self.start); + self.start += 1; + Some(row) + } + + fn size_hint(&self) -> (usize, Option) { + let len = self.len(); + (len, Some(len)) + } +} + +impl<'a> ExactSizeIterator for RowsIter<'a> { + fn len(&self) -> usize { + self.end - self.start + } +} + +impl<'a> DoubleEndedIterator for RowsIter<'a> { + fn next_back(&mut self) -> Option { + if self.end == self.start { + return None; + } + let row = self.rows.row(self.end); + self.end -= 1; + Some(row) + } +} + +/// A comparable representation of a row. +/// +/// See the [module level documentation](self) for more details. +/// +/// Two [`Row`] can only be compared if they both belong to [`Rows`] +/// returned by calls to [`RowConverter::convert_columns`] on the same +/// [`RowConverter`]. If different [`RowConverter`]s are used, any +/// ordering established by comparing the [`Row`] is arbitrary. +#[derive(Debug, Copy, Clone)] +pub struct Row<'a> { + data: &'a [u8], + config: &'a RowConfig, +} + +impl<'a> Row<'a> { + /// Create owned version of the row to detach it from the shared [`Rows`]. + pub fn owned(&self) -> OwnedRow { + OwnedRow { + data: self.data.into(), + config: self.config.clone(), + } + } +} + +// Manually derive these as don't wish to include `fields` + +impl<'a> PartialEq for Row<'a> { + #[inline] + fn eq(&self, other: &Self) -> bool { + self.data.eq(other.data) + } +} + +impl<'a> Eq for Row<'a> {} + +impl<'a> PartialOrd for Row<'a> { + #[inline] + fn partial_cmp(&self, other: &Self) -> Option { + self.data.partial_cmp(other.data) + } +} + +impl<'a> Ord for Row<'a> { + #[inline] + fn cmp(&self, other: &Self) -> Ordering { + self.data.cmp(other.data) + } +} + +impl<'a> Hash for Row<'a> { + #[inline] + fn hash(&self, state: &mut H) { + self.data.hash(state) + } +} + +impl<'a> AsRef<[u8]> for Row<'a> { + #[inline] + fn as_ref(&self) -> &[u8] { + self.data + } +} + +/// Owned version of a [`Row`] that can be moved/cloned freely. +/// +/// This contains the data for the one specific row (not the entire buffer of all rows). +#[derive(Debug, Clone)] +pub struct OwnedRow { + data: Box<[u8]>, + config: RowConfig, +} + +impl OwnedRow { + /// Get borrowed [`Row`] from owned version. + /// + /// This is helpful if you want to compare an [`OwnedRow`] with a [`Row`]. + pub fn row(&self) -> Row<'_> { + Row { + data: &self.data, + config: &self.config, + } + } +} + +// Manually derive these as don't wish to include `fields`. Also we just want to use the same `Row` implementations here. + +impl PartialEq for OwnedRow { + #[inline] + fn eq(&self, other: &Self) -> bool { + self.row().eq(&other.row()) + } +} + +impl Eq for OwnedRow {} + +impl PartialOrd for OwnedRow { + #[inline] + fn partial_cmp(&self, other: &Self) -> Option { + self.row().partial_cmp(&other.row()) + } +} + +impl Ord for OwnedRow { + #[inline] + fn cmp(&self, other: &Self) -> Ordering { + self.row().cmp(&other.row()) + } +} + +impl Hash for OwnedRow { + #[inline] + fn hash(&self, state: &mut H) { + self.row().hash(state) + } +} + +impl AsRef<[u8]> for OwnedRow { + #[inline] + fn as_ref(&self) -> &[u8] { + &self.data + } +} + +/// Returns the null sentinel, negated if `invert` is true +#[inline] +fn null_sentinel(options: SortOptions) -> u8 { + match options.nulls_first { + true => 0, + false => 0xFF, + } +} + +/// Computes the length of each encoded [`Rows`] and returns an empty [`Rows`] +fn new_empty_rows(cols: &[ArrayRef], encoders: &[Encoder], config: RowConfig) -> Rows { + use fixed::FixedLengthEncoding; + + let num_rows = cols.first().map(|x| x.len()).unwrap_or(0); + let mut lengths = vec![0; num_rows]; + + for (array, encoder) in cols.iter().zip(encoders) { + match encoder { + Encoder::Stateless => { + downcast_primitive_array! { + array => lengths.iter_mut().for_each(|x| *x += fixed::encoded_len(array)), + DataType::Null => {}, + DataType::Boolean => lengths.iter_mut().for_each(|x| *x += bool::ENCODED_LEN), + DataType::Binary => as_generic_binary_array::(array) + .iter() + .zip(lengths.iter_mut()) + .for_each(|(slice, length)| *length += variable::encoded_len(slice)), + DataType::LargeBinary => as_generic_binary_array::(array) + .iter() + .zip(lengths.iter_mut()) + .for_each(|(slice, length)| *length += variable::encoded_len(slice)), + DataType::Utf8 => as_string_array(array) + .iter() + .zip(lengths.iter_mut()) + .for_each(|(slice, length)| { + *length += variable::encoded_len(slice.map(|x| x.as_bytes())) + }), + DataType::LargeUtf8 => as_largestring_array(array) + .iter() + .zip(lengths.iter_mut()) + .for_each(|(slice, length)| { + *length += variable::encoded_len(slice.map(|x| x.as_bytes())) + }), + DataType::FixedSizeBinary(len) => { + let len = len.to_usize().unwrap(); + lengths.iter_mut().for_each(|x| *x += 1 + len) + } + _ => unreachable!(), + } + } + Encoder::Dictionary(dict) => { + downcast_dictionary_array! { + array => { + for (v, length) in array.keys().iter().zip(lengths.iter_mut()) { + match v.and_then(|v| dict[v as usize]) { + Some(k) => *length += k.len() + 1, + None => *length += 1, + } + } + } + _ => unreachable!(), + } + } + Encoder::Struct(rows, null) => { + let array = as_struct_array(array); + lengths.iter_mut().enumerate().for_each(|(idx, length)| { + match array.is_valid(idx) { + true => *length += 1 + rows.row(idx).as_ref().len(), + false => *length += 1 + null.data.len(), + } + }); + } + Encoder::List(rows) => match array.data_type() { + DataType::List(_) => { + list::compute_lengths(&mut lengths, rows, as_list_array(array)) + } + DataType::LargeList(_) => { + list::compute_lengths(&mut lengths, rows, as_large_list_array(array)) + } + _ => unreachable!(), + }, + } + } + + let mut offsets = Vec::with_capacity(num_rows + 1); + offsets.push(0); + + // We initialize the offsets shifted down by one row index. + // + // As the rows are appended to the offsets will be incremented to match + // + // For example, consider the case of 3 rows of length 3, 4, and 6 respectively. + // The offsets would be initialized to `0, 0, 3, 7` + // + // Writing the first row entirely would yield `0, 3, 3, 7` + // The second, `0, 3, 7, 7` + // The third, `0, 3, 7, 13` + // + // This would be the final offsets for reading + // + // In this way offsets tracks the position during writing whilst eventually serving + // as identifying the offsets of the written rows + let mut cur_offset = 0_usize; + for l in lengths { + offsets.push(cur_offset); + cur_offset = cur_offset.checked_add(l).expect("overflow"); + } + + let buffer = vec![0_u8; cur_offset]; + + Rows { + buffer: buffer.into(), + offsets: offsets.into(), + config, + } +} + +/// Encodes a column to the provided [`Rows`] incrementing the offsets as it progresses +fn encode_column( + out: &mut Rows, + column: &dyn Array, + opts: SortOptions, + encoder: &Encoder<'_>, +) { + match encoder { + Encoder::Stateless => { + downcast_primitive_array! { + column => fixed::encode(out, column, opts), + DataType::Null => {} + DataType::Boolean => fixed::encode(out, as_boolean_array(column), opts), + DataType::Binary => { + variable::encode(out, as_generic_binary_array::(column).iter(), opts) + } + DataType::LargeBinary => { + variable::encode(out, as_generic_binary_array::(column).iter(), opts) + } + DataType::Utf8 => variable::encode( + out, + as_string_array(column).iter().map(|x| x.map(|x| x.as_bytes())), + opts, + ), + DataType::LargeUtf8 => variable::encode( + out, + as_largestring_array(column) + .iter() + .map(|x| x.map(|x| x.as_bytes())), + opts, + ), + DataType::FixedSizeBinary(_) => { + let array = column.as_any().downcast_ref().unwrap(); + fixed::encode_fixed_size_binary(out, array, opts) + } + _ => unreachable!(), + } + } + Encoder::Dictionary(dict) => { + downcast_dictionary_array! { + column => encode_dictionary(out, column, dict, opts), + _ => unreachable!() + } + } + Encoder::Struct(rows, null) => { + let array = as_struct_array(column); + let null_sentinel = null_sentinel(opts); + out.offsets + .iter_mut() + .skip(1) + .enumerate() + .for_each(|(idx, offset)| { + let (row, sentinel) = match array.is_valid(idx) { + true => (rows.row(idx), 0x01), + false => (*null, null_sentinel), + }; + let end_offset = *offset + 1 + row.as_ref().len(); + out.buffer[*offset] = sentinel; + out.buffer[*offset + 1..end_offset].copy_from_slice(row.as_ref()); + *offset = end_offset; + }) + } + Encoder::List(rows) => match column.data_type() { + DataType::List(_) => list::encode(out, rows, opts, as_list_array(column)), + DataType::LargeList(_) => { + list::encode(out, rows, opts, as_large_list_array(column)) + } + _ => unreachable!(), + }, + } +} + +macro_rules! decode_primitive_helper { + ($t:ty, $rows:ident, $data_type:ident, $options:ident) => { + Arc::new(decode_primitive::<$t>($rows, $data_type, $options)) + }; +} + +macro_rules! decode_dictionary_helper { + ($t:ty, $interner:ident, $v:ident, $options:ident, $rows:ident) => { + Arc::new(decode_dictionary::<$t>($interner, $v, $options, $rows)?) + }; +} + +/// Decodes a the provided `field` from `rows` +/// +/// # Safety +/// +/// Rows must contain valid data for the provided field +unsafe fn decode_column( + field: &SortField, + rows: &mut [&[u8]], + codec: &Codec, + validate_utf8: bool, +) -> Result { + let options = field.options; + + let array: ArrayRef = match codec { + Codec::Stateless => { + let data_type = field.data_type.clone(); + downcast_primitive! { + data_type => (decode_primitive_helper, rows, data_type, options), + DataType::Null => Arc::new(NullArray::new(rows.len())), + DataType::Boolean => Arc::new(decode_bool(rows, options)), + DataType::Binary => Arc::new(decode_binary::(rows, options)), + DataType::LargeBinary => Arc::new(decode_binary::(rows, options)), + DataType::FixedSizeBinary(size) => Arc::new(decode_fixed_size_binary(rows, size, options)), + DataType::Utf8 => Arc::new(decode_string::(rows, options, validate_utf8)), + DataType::LargeUtf8 => Arc::new(decode_string::(rows, options, validate_utf8)), + _ => unreachable!() + } + } + Codec::Dictionary(interner) => { + let (k, v) = match &field.data_type { + DataType::Dictionary(k, v) => (k.as_ref(), v.as_ref()), + _ => unreachable!(), + }; + downcast_integer! { + k => (decode_dictionary_helper, interner, v, options, rows), + _ => unreachable!() + } + } + Codec::Struct(converter, _) => { + let (null_count, nulls) = fixed::decode_nulls(rows); + rows.iter_mut().for_each(|row| *row = &row[1..]); + let children = converter.convert_raw(rows, validate_utf8)?; + + let child_data = children.iter().map(|c| c.data().clone()).collect(); + let builder = ArrayDataBuilder::new(field.data_type.clone()) + .len(rows.len()) + .null_count(null_count) + .null_bit_buffer(Some(nulls)) + .child_data(child_data); + + Arc::new(StructArray::from(builder.build_unchecked())) + } + Codec::List(converter) => match &field.data_type { + DataType::List(_) => { + Arc::new(list::decode::(converter, rows, field, validate_utf8)?) + } + DataType::LargeList(_) => { + Arc::new(list::decode::(converter, rows, field, validate_utf8)?) + } + _ => unreachable!(), + }, + }; + Ok(array) +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use rand::distributions::uniform::SampleUniform; + use rand::distributions::{Distribution, Standard}; + use rand::{thread_rng, Rng}; + + use arrow_array::builder::*; + use arrow_array::types::*; + use arrow_array::*; + use arrow_buffer::i256; + use arrow_buffer::Buffer; + use arrow_cast::display::array_value_to_string; + use arrow_ord::sort::{LexicographicalComparator, SortColumn, SortOptions}; + + use super::*; + + #[test] + fn test_fixed_width() { + let cols = [ + Arc::new(Int16Array::from_iter([ + Some(1), + Some(2), + None, + Some(-5), + Some(2), + Some(2), + Some(0), + ])) as ArrayRef, + Arc::new(Float32Array::from_iter([ + Some(1.3), + Some(2.5), + None, + Some(4.), + Some(0.1), + Some(-4.), + Some(-0.), + ])) as ArrayRef, + ]; + + let mut converter = RowConverter::new(vec![ + SortField::new(DataType::Int16), + SortField::new(DataType::Float32), + ]) + .unwrap(); + let rows = converter.convert_columns(&cols).unwrap(); + + assert_eq!(rows.offsets.as_ref(), &[0, 8, 16, 24, 32, 40, 48, 56]); + assert_eq!( + rows.buffer.as_ref(), + &[ + 1, 128, 1, // + 1, 191, 166, 102, 102, // + 1, 128, 2, // + 1, 192, 32, 0, 0, // + 0, 0, 0, // + 0, 0, 0, 0, 0, // + 1, 127, 251, // + 1, 192, 128, 0, 0, // + 1, 128, 2, // + 1, 189, 204, 204, 205, // + 1, 128, 2, // + 1, 63, 127, 255, 255, // + 1, 128, 0, // + 1, 127, 255, 255, 255 // + ] + ); + + assert!(rows.row(3) < rows.row(6)); + assert!(rows.row(0) < rows.row(1)); + assert!(rows.row(3) < rows.row(0)); + assert!(rows.row(4) < rows.row(1)); + assert!(rows.row(5) < rows.row(4)); + + let back = converter.convert_rows(&rows).unwrap(); + for (expected, actual) in cols.iter().zip(&back) { + assert_eq!(expected, actual); + } + } + + #[test] + fn test_decimal128() { + let mut converter = RowConverter::new(vec![SortField::new( + DataType::Decimal128(DECIMAL128_MAX_PRECISION, 7), + )]) + .unwrap(); + let col = Arc::new( + Decimal128Array::from_iter([ + None, + Some(i128::MIN), + Some(-13), + Some(46_i128), + Some(5456_i128), + Some(i128::MAX), + ]) + .with_precision_and_scale(38, 7) + .unwrap(), + ) as ArrayRef; + + let rows = converter.convert_columns(&[Arc::clone(&col)]).unwrap(); + for i in 0..rows.num_rows() - 1 { + assert!(rows.row(i) < rows.row(i + 1)); + } + + let back = converter.convert_rows(&rows).unwrap(); + assert_eq!(back.len(), 1); + assert_eq!(col.as_ref(), back[0].as_ref()) + } + + #[test] + fn test_decimal256() { + let mut converter = RowConverter::new(vec![SortField::new( + DataType::Decimal256(DECIMAL256_MAX_PRECISION, 7), + )]) + .unwrap(); + let col = Arc::new( + Decimal256Array::from_iter([ + None, + Some(i256::MIN), + Some(i256::from_parts(0, -1)), + Some(i256::from_parts(u128::MAX, -1)), + Some(i256::from_parts(u128::MAX, 0)), + Some(i256::from_parts(0, 46_i128)), + Some(i256::from_parts(5, 46_i128)), + Some(i256::MAX), + ]) + .with_precision_and_scale(DECIMAL256_MAX_PRECISION, 7) + .unwrap(), + ) as ArrayRef; + + let rows = converter.convert_columns(&[Arc::clone(&col)]).unwrap(); + for i in 0..rows.num_rows() - 1 { + assert!(rows.row(i) < rows.row(i + 1)); + } + + let back = converter.convert_rows(&rows).unwrap(); + assert_eq!(back.len(), 1); + assert_eq!(col.as_ref(), back[0].as_ref()) + } + + #[test] + fn test_bool() { + let mut converter = + RowConverter::new(vec![SortField::new(DataType::Boolean)]).unwrap(); + + let col = Arc::new(BooleanArray::from_iter([None, Some(false), Some(true)])) + as ArrayRef; + + let rows = converter.convert_columns(&[Arc::clone(&col)]).unwrap(); + assert!(rows.row(2) > rows.row(1)); + assert!(rows.row(2) > rows.row(0)); + assert!(rows.row(1) > rows.row(0)); + + let cols = converter.convert_rows(&rows).unwrap(); + assert_eq!(&cols[0], &col); + + let mut converter = RowConverter::new(vec![SortField::new_with_options( + DataType::Boolean, + SortOptions { + descending: true, + nulls_first: false, + }, + )]) + .unwrap(); + + let rows = converter.convert_columns(&[Arc::clone(&col)]).unwrap(); + assert!(rows.row(2) < rows.row(1)); + assert!(rows.row(2) < rows.row(0)); + assert!(rows.row(1) < rows.row(0)); + let cols = converter.convert_rows(&rows).unwrap(); + assert_eq!(&cols[0], &col); + } + + #[test] + fn test_timezone() { + let a = TimestampNanosecondArray::from(vec![1, 2, 3, 4, 5]) + .with_timezone("+01:00".to_string()); + let d = a.data_type().clone(); + + let mut converter = + RowConverter::new(vec![SortField::new(a.data_type().clone())]).unwrap(); + let rows = converter.convert_columns(&[Arc::new(a) as _]).unwrap(); + let back = converter.convert_rows(&rows).unwrap(); + assert_eq!(back.len(), 1); + assert_eq!(back[0].data_type(), &d); + + // Test dictionary + let mut a = + PrimitiveDictionaryBuilder::::new(); + a.append(34).unwrap(); + a.append_null(); + a.append(345).unwrap(); + + // Construct dictionary with a timezone + let dict = a.finish(); + let values = TimestampNanosecondArray::from(dict.values().data().clone()); + let dict_with_tz = dict.with_values(&values.with_timezone("+02:00".to_string())); + let d = DataType::Dictionary( + Box::new(DataType::Int32), + Box::new(DataType::Timestamp( + TimeUnit::Nanosecond, + Some("+02:00".to_string()), + )), + ); + + assert_eq!(dict_with_tz.data_type(), &d); + let mut converter = RowConverter::new(vec![SortField::new(d.clone())]).unwrap(); + let rows = converter + .convert_columns(&[Arc::new(dict_with_tz) as _]) + .unwrap(); + let back = converter.convert_rows(&rows).unwrap(); + assert_eq!(back.len(), 1); + assert_eq!(back[0].data_type(), &d); + } + + #[test] + fn test_null_encoding() { + let col = Arc::new(NullArray::new(10)); + let mut converter = + RowConverter::new(vec![SortField::new(DataType::Null)]).unwrap(); + let rows = converter.convert_columns(&[col]).unwrap(); + assert_eq!(rows.num_rows(), 10); + assert_eq!(rows.row(1).data.len(), 0); + } + + #[test] + fn test_variable_width() { + let col = Arc::new(StringArray::from_iter([ + Some("hello"), + Some("he"), + None, + Some("foo"), + Some(""), + ])) as ArrayRef; + + let mut converter = + RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap(); + let rows = converter.convert_columns(&[Arc::clone(&col)]).unwrap(); + + assert!(rows.row(1) < rows.row(0)); + assert!(rows.row(2) < rows.row(4)); + assert!(rows.row(3) < rows.row(0)); + assert!(rows.row(3) < rows.row(1)); + + let cols = converter.convert_rows(&rows).unwrap(); + assert_eq!(&cols[0], &col); + + let col = Arc::new(BinaryArray::from_iter([ + None, + Some(vec![0_u8; 0]), + Some(vec![0_u8; 6]), + Some(vec![0_u8; variable::BLOCK_SIZE]), + Some(vec![0_u8; variable::BLOCK_SIZE + 1]), + Some(vec![1_u8; 6]), + Some(vec![1_u8; variable::BLOCK_SIZE]), + Some(vec![1_u8; variable::BLOCK_SIZE + 1]), + Some(vec![0xFF_u8; 6]), + Some(vec![0xFF_u8; variable::BLOCK_SIZE]), + Some(vec![0xFF_u8; variable::BLOCK_SIZE + 1]), + ])) as ArrayRef; + + let mut converter = + RowConverter::new(vec![SortField::new(DataType::Binary)]).unwrap(); + let rows = converter.convert_columns(&[Arc::clone(&col)]).unwrap(); + + for i in 0..rows.num_rows() { + for j in i + 1..rows.num_rows() { + assert!( + rows.row(i) < rows.row(j), + "{} < {} - {:?} < {:?}", + i, + j, + rows.row(i), + rows.row(j) + ); + } + } + + let cols = converter.convert_rows(&rows).unwrap(); + assert_eq!(&cols[0], &col); + + let mut converter = RowConverter::new(vec![SortField::new_with_options( + DataType::Binary, + SortOptions { + descending: true, + nulls_first: false, + }, + )]) + .unwrap(); + let rows = converter.convert_columns(&[Arc::clone(&col)]).unwrap(); + + for i in 0..rows.num_rows() { + for j in i + 1..rows.num_rows() { + assert!( + rows.row(i) > rows.row(j), + "{} > {} - {:?} > {:?}", + i, + j, + rows.row(i), + rows.row(j) + ); + } + } + + let cols = converter.convert_rows(&rows).unwrap(); + assert_eq!(&cols[0], &col); + } + + #[test] + fn test_string_dictionary() { + let a = Arc::new(DictionaryArray::::from_iter([ + Some("foo"), + Some("hello"), + Some("he"), + None, + Some("hello"), + Some(""), + Some("hello"), + Some("hello"), + ])) as ArrayRef; + + let mut converter = + RowConverter::new(vec![SortField::new(a.data_type().clone())]).unwrap(); + let rows_a = converter.convert_columns(&[Arc::clone(&a)]).unwrap(); + + assert!(rows_a.row(3) < rows_a.row(5)); + assert!(rows_a.row(2) < rows_a.row(1)); + assert!(rows_a.row(0) < rows_a.row(1)); + assert!(rows_a.row(3) < rows_a.row(0)); + + assert_eq!(rows_a.row(1), rows_a.row(4)); + assert_eq!(rows_a.row(1), rows_a.row(6)); + assert_eq!(rows_a.row(1), rows_a.row(7)); + + let cols = converter.convert_rows(&rows_a).unwrap(); + assert_eq!(&cols[0], &a); + + let b = Arc::new(DictionaryArray::::from_iter([ + Some("hello"), + None, + Some("cupcakes"), + ])) as ArrayRef; + + let rows_b = converter.convert_columns(&[Arc::clone(&b)]).unwrap(); + assert_eq!(rows_a.row(1), rows_b.row(0)); + assert_eq!(rows_a.row(3), rows_b.row(1)); + assert!(rows_b.row(2) < rows_a.row(0)); + + let cols = converter.convert_rows(&rows_b).unwrap(); + assert_eq!(&cols[0], &b); + + let mut converter = RowConverter::new(vec![SortField::new_with_options( + a.data_type().clone(), + SortOptions { + descending: true, + nulls_first: false, + }, + )]) + .unwrap(); + + let rows_c = converter.convert_columns(&[Arc::clone(&a)]).unwrap(); + assert!(rows_c.row(3) > rows_c.row(5)); + assert!(rows_c.row(2) > rows_c.row(1)); + assert!(rows_c.row(0) > rows_c.row(1)); + assert!(rows_c.row(3) > rows_c.row(0)); + + let cols = converter.convert_rows(&rows_c).unwrap(); + assert_eq!(&cols[0], &a); + + let mut converter = RowConverter::new(vec![SortField::new_with_options( + a.data_type().clone(), + SortOptions { + descending: true, + nulls_first: true, + }, + )]) + .unwrap(); + + let rows_c = converter.convert_columns(&[Arc::clone(&a)]).unwrap(); + assert!(rows_c.row(3) < rows_c.row(5)); + assert!(rows_c.row(2) > rows_c.row(1)); + assert!(rows_c.row(0) > rows_c.row(1)); + assert!(rows_c.row(3) < rows_c.row(0)); + + let cols = converter.convert_rows(&rows_c).unwrap(); + assert_eq!(&cols[0], &a); + } + + #[test] + fn test_struct() { + // Test basic + let a = Arc::new(Int32Array::from(vec![1, 1, 2, 2])) as ArrayRef; + let a_f = Field::new("int", DataType::Int32, false); + let u = Arc::new(StringArray::from(vec!["a", "b", "c", "d"])) as ArrayRef; + let u_f = Field::new("s", DataType::Utf8, false); + let s1 = Arc::new(StructArray::from(vec![(a_f, a), (u_f, u)])) as ArrayRef; + + let sort_fields = vec![SortField::new(s1.data_type().clone())]; + let mut converter = RowConverter::new(sort_fields).unwrap(); + let r1 = converter.convert_columns(&[Arc::clone(&s1)]).unwrap(); + + for (a, b) in r1.iter().zip(r1.iter().skip(1)) { + assert!(a < b); + } + + let back = converter.convert_rows(&r1).unwrap(); + assert_eq!(back.len(), 1); + assert_eq!(&back[0], &s1); + + // Test struct nullability + let data = s1 + .data() + .clone() + .into_builder() + .null_bit_buffer(Some(Buffer::from_slice_ref([0b00001010]))) + .null_count(2) + .build() + .unwrap(); + + let s2 = Arc::new(StructArray::from(data)) as ArrayRef; + let r2 = converter.convert_columns(&[Arc::clone(&s2)]).unwrap(); + assert_eq!(r2.row(0), r2.row(2)); // Nulls equal + assert!(r2.row(0) < r2.row(1)); // Nulls first + assert_ne!(r1.row(0), r2.row(0)); // Value does not equal null + assert_eq!(r1.row(1), r2.row(1)); // Values equal + + let back = converter.convert_rows(&r2).unwrap(); + assert_eq!(back.len(), 1); + assert_eq!(&back[0], &s2); + + back[0].data().validate_full().unwrap(); + } + + #[test] + fn test_primitive_dictionary() { + let mut builder = PrimitiveDictionaryBuilder::::new(); + builder.append(2).unwrap(); + builder.append(3).unwrap(); + builder.append(0).unwrap(); + builder.append_null(); + builder.append(5).unwrap(); + builder.append(3).unwrap(); + builder.append(-1).unwrap(); + + let a = builder.finish(); + + let mut converter = + RowConverter::new(vec![SortField::new(a.data_type().clone())]).unwrap(); + let rows = converter.convert_columns(&[Arc::new(a)]).unwrap(); + assert!(rows.row(0) < rows.row(1)); + assert!(rows.row(2) < rows.row(0)); + assert!(rows.row(3) < rows.row(2)); + assert!(rows.row(6) < rows.row(2)); + assert!(rows.row(3) < rows.row(6)); + } + + #[test] + fn test_dictionary_nulls() { + let values = + Int32Array::from_iter([Some(1), Some(-1), None, Some(4), None]).into_data(); + let keys = + Int32Array::from_iter([Some(0), Some(0), Some(1), Some(2), Some(4), None]) + .into_data(); + + let data_type = + DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Int32)); + let data = keys + .into_builder() + .data_type(data_type.clone()) + .child_data(vec![values]) + .build() + .unwrap(); + + let mut converter = RowConverter::new(vec![SortField::new(data_type)]).unwrap(); + let rows = converter + .convert_columns(&[Arc::new(DictionaryArray::::from(data))]) + .unwrap(); + + assert_eq!(rows.row(0), rows.row(1)); + assert_eq!(rows.row(3), rows.row(4)); + assert_eq!(rows.row(4), rows.row(5)); + assert!(rows.row(3) < rows.row(0)); + } + + #[test] + #[should_panic(expected = "Invalid UTF-8 sequence")] + fn test_invalid_utf8() { + let mut converter = + RowConverter::new(vec![SortField::new(DataType::Binary)]).unwrap(); + let array = Arc::new(BinaryArray::from_iter_values([&[0xFF]])) as _; + let rows = converter.convert_columns(&[array]).unwrap(); + let binary_row = rows.row(0); + + let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap(); + let parser = converter.parser(); + let utf8_row = parser.parse(binary_row.as_ref()); + + converter.convert_rows(std::iter::once(utf8_row)).unwrap(); + } + + #[test] + #[should_panic(expected = "rows were not produced by this RowConverter")] + fn test_different_converter() { + let values = Arc::new(Int32Array::from_iter([Some(1), Some(-1)])); + let mut converter = + RowConverter::new(vec![SortField::new(DataType::Int32)]).unwrap(); + let rows = converter.convert_columns(&[values]).unwrap(); + + let converter = RowConverter::new(vec![SortField::new(DataType::Int32)]).unwrap(); + let _ = converter.convert_rows(&rows); + } + + fn test_single_list() { + let mut builder = GenericListBuilder::::new(Int32Builder::new()); + builder.values().append_value(32); + builder.values().append_value(52); + builder.values().append_value(32); + builder.append(true); + builder.values().append_value(32); + builder.values().append_value(52); + builder.values().append_value(12); + builder.append(true); + builder.values().append_value(32); + builder.values().append_value(52); + builder.append(true); + builder.values().append_value(32); // MASKED + builder.values().append_value(52); // MASKED + builder.append(false); + builder.values().append_value(32); + builder.values().append_null(); + builder.append(true); + builder.append(true); + + let list = Arc::new(builder.finish()) as ArrayRef; + let d = list.data_type().clone(); + + let mut converter = RowConverter::new(vec![SortField::new(d.clone())]).unwrap(); + + let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap(); + assert!(rows.row(0) > rows.row(1)); // [32, 52, 32] > [32, 52, 12] + assert!(rows.row(2) < rows.row(1)); // [32, 42] < [32, 52, 12] + assert!(rows.row(3) < rows.row(2)); // null < [32, 42] + assert!(rows.row(4) < rows.row(2)); // [32, null] < [32, 42] + assert!(rows.row(5) < rows.row(2)); // [] < [32, 42] + assert!(rows.row(3) < rows.row(5)); // null < [] + + let back = converter.convert_rows(&rows).unwrap(); + assert_eq!(back.len(), 1); + back[0].data().validate_full().unwrap(); + assert_eq!(&back[0], &list); + + let options = SortOptions { + descending: false, + nulls_first: false, + }; + let field = SortField::new_with_options(d.clone(), options); + let mut converter = RowConverter::new(vec![field]).unwrap(); + let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap(); + + assert!(rows.row(0) > rows.row(1)); // [32, 52, 32] > [32, 52, 12] + assert!(rows.row(2) < rows.row(1)); // [32, 42] < [32, 52, 12] + assert!(rows.row(3) > rows.row(2)); // null > [32, 42] + assert!(rows.row(4) > rows.row(2)); // [32, null] > [32, 42] + assert!(rows.row(5) < rows.row(2)); // [] < [32, 42] + assert!(rows.row(3) > rows.row(5)); // null > [] + + let back = converter.convert_rows(&rows).unwrap(); + assert_eq!(back.len(), 1); + back[0].data().validate_full().unwrap(); + assert_eq!(&back[0], &list); + + let options = SortOptions { + descending: true, + nulls_first: false, + }; + let field = SortField::new_with_options(d.clone(), options); + let mut converter = RowConverter::new(vec![field]).unwrap(); + let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap(); + + assert!(rows.row(0) < rows.row(1)); // [32, 52, 32] < [32, 52, 12] + assert!(rows.row(2) > rows.row(1)); // [32, 42] > [32, 52, 12] + assert!(rows.row(3) > rows.row(2)); // null > [32, 42] + assert!(rows.row(4) > rows.row(2)); // [32, null] > [32, 42] + assert!(rows.row(5) > rows.row(2)); // [] > [32, 42] + assert!(rows.row(3) > rows.row(5)); // null > [] + + let back = converter.convert_rows(&rows).unwrap(); + assert_eq!(back.len(), 1); + back[0].data().validate_full().unwrap(); + assert_eq!(&back[0], &list); + + let options = SortOptions { + descending: true, + nulls_first: true, + }; + let field = SortField::new_with_options(d, options); + let mut converter = RowConverter::new(vec![field]).unwrap(); + let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap(); + + assert!(rows.row(0) < rows.row(1)); // [32, 52, 32] < [32, 52, 12] + assert!(rows.row(2) > rows.row(1)); // [32, 42] > [32, 52, 12] + assert!(rows.row(3) < rows.row(2)); // null < [32, 42] + assert!(rows.row(4) < rows.row(2)); // [32, null] < [32, 42] + assert!(rows.row(5) > rows.row(2)); // [] > [32, 42] + assert!(rows.row(3) < rows.row(5)); // null < [] + + let back = converter.convert_rows(&rows).unwrap(); + assert_eq!(back.len(), 1); + back[0].data().validate_full().unwrap(); + assert_eq!(&back[0], &list); + } + + fn test_nested_list() { + let mut builder = GenericListBuilder::::new( + GenericListBuilder::::new(Int32Builder::new()), + ); + + builder.values().values().append_value(1); + builder.values().values().append_value(2); + builder.values().append(true); + builder.values().values().append_value(1); + builder.values().values().append_null(); + builder.values().append(true); + builder.append(true); + + builder.values().values().append_value(1); + builder.values().values().append_null(); + builder.values().append(true); + builder.values().values().append_value(1); + builder.values().values().append_null(); + builder.values().append(true); + builder.append(true); + + builder.values().values().append_value(1); + builder.values().values().append_null(); + builder.values().append(true); + builder.values().append(false); + builder.append(true); + builder.append(false); + + builder.values().values().append_value(1); + builder.values().values().append_value(2); + builder.values().append(true); + builder.append(true); + + let list = Arc::new(builder.finish()) as ArrayRef; + let d = list.data_type().clone(); + + // [ + // [[1, 2], [1, null]], + // [[1, null], [1, null]], + // [[1, null], null] + // null + // [[1, 2]] + // ] + let options = SortOptions { + descending: false, + nulls_first: true, + }; + let field = SortField::new_with_options(d.clone(), options); + let mut converter = RowConverter::new(vec![field]).unwrap(); + let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap(); + + assert!(rows.row(0) > rows.row(1)); + assert!(rows.row(1) > rows.row(2)); + assert!(rows.row(2) > rows.row(3)); + assert!(rows.row(4) < rows.row(0)); + assert!(rows.row(4) > rows.row(1)); + + let back = converter.convert_rows(&rows).unwrap(); + assert_eq!(back.len(), 1); + back[0].data().validate_full().unwrap(); + assert_eq!(&back[0], &list); + + let options = SortOptions { + descending: true, + nulls_first: true, + }; + let field = SortField::new_with_options(d.clone(), options); + let mut converter = RowConverter::new(vec![field]).unwrap(); + let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap(); + + assert!(rows.row(0) > rows.row(1)); + assert!(rows.row(1) > rows.row(2)); + assert!(rows.row(2) > rows.row(3)); + assert!(rows.row(4) > rows.row(0)); + assert!(rows.row(4) > rows.row(1)); + + let back = converter.convert_rows(&rows).unwrap(); + assert_eq!(back.len(), 1); + back[0].data().validate_full().unwrap(); + assert_eq!(&back[0], &list); + + let options = SortOptions { + descending: true, + nulls_first: false, + }; + let field = SortField::new_with_options(d, options); + let mut converter = RowConverter::new(vec![field]).unwrap(); + let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap(); + + assert!(rows.row(0) < rows.row(1)); + assert!(rows.row(1) < rows.row(2)); + assert!(rows.row(2) < rows.row(3)); + assert!(rows.row(4) > rows.row(0)); + assert!(rows.row(4) < rows.row(1)); + + let back = converter.convert_rows(&rows).unwrap(); + assert_eq!(back.len(), 1); + back[0].data().validate_full().unwrap(); + assert_eq!(&back[0], &list); + } + + #[test] + fn test_list() { + test_single_list::(); + test_nested_list::(); + } + + #[test] + fn test_large_list() { + test_single_list::(); + test_nested_list::(); + } + + fn generate_primitive_array(len: usize, valid_percent: f64) -> PrimitiveArray + where + K: ArrowPrimitiveType, + Standard: Distribution, + { + let mut rng = thread_rng(); + (0..len) + .map(|_| rng.gen_bool(valid_percent).then(|| rng.gen())) + .collect() + } + + fn generate_strings( + len: usize, + valid_percent: f64, + ) -> GenericStringArray { + let mut rng = thread_rng(); + (0..len) + .map(|_| { + rng.gen_bool(valid_percent).then(|| { + let len = rng.gen_range(0..100); + let bytes = (0..len).map(|_| rng.gen_range(0..128)).collect(); + String::from_utf8(bytes).unwrap() + }) + }) + .collect() + } + + fn generate_dictionary( + values: ArrayRef, + len: usize, + valid_percent: f64, + ) -> DictionaryArray + where + K: ArrowDictionaryKeyType, + K::Native: SampleUniform, + { + let mut rng = thread_rng(); + let min_key = K::Native::from_usize(0).unwrap(); + let max_key = K::Native::from_usize(values.len()).unwrap(); + let keys: PrimitiveArray = (0..len) + .map(|_| { + rng.gen_bool(valid_percent) + .then(|| rng.gen_range(min_key..max_key)) + }) + .collect(); + + let data_type = DataType::Dictionary( + Box::new(K::DATA_TYPE), + Box::new(values.data_type().clone()), + ); + + let data = keys + .into_data() + .into_builder() + .data_type(data_type) + .add_child_data(values.data().clone()) + .build() + .unwrap(); + + DictionaryArray::from(data) + } + + fn generate_fixed_size_binary( + len: usize, + valid_percent: f64, + ) -> FixedSizeBinaryArray { + let mut rng = thread_rng(); + let width = rng.gen_range(0..20); + let mut builder = FixedSizeBinaryBuilder::new(width); + + let mut b = vec![0; width as usize]; + for _ in 0..len { + match rng.gen_bool(valid_percent) { + true => { + b.iter_mut().for_each(|x| *x = rng.gen()); + builder.append_value(&b).unwrap(); + } + false => builder.append_null(), + } + } + + builder.finish() + } + + fn generate_column(len: usize) -> ArrayRef { + let mut rng = thread_rng(); + match rng.gen_range(0..10) { + 0 => Arc::new(generate_primitive_array::(len, 0.8)), + 1 => Arc::new(generate_primitive_array::(len, 0.8)), + 2 => Arc::new(generate_primitive_array::(len, 0.8)), + 3 => Arc::new(generate_primitive_array::(len, 0.8)), + 4 => Arc::new(generate_primitive_array::(len, 0.8)), + 5 => Arc::new(generate_primitive_array::(len, 0.8)), + 6 => Arc::new(generate_strings::(len, 0.8)), + 7 => Arc::new(generate_dictionary::( + // Cannot test dictionaries containing null values because of #2687 + Arc::new(generate_strings::(rng.gen_range(1..len), 1.0)), + len, + 0.8, + )), + 8 => Arc::new(generate_dictionary::( + // Cannot test dictionaries containing null values because of #2687 + Arc::new(generate_primitive_array::( + rng.gen_range(1..len), + 1.0, + )), + len, + 0.8, + )), + 9 => Arc::new(generate_fixed_size_binary(len, 0.8)), + _ => unreachable!(), + } + } + + fn print_row(cols: &[SortColumn], row: usize) -> String { + let t: Vec<_> = cols + .iter() + .map(|x| array_value_to_string(&x.values, row).unwrap()) + .collect(); + t.join(",") + } + + fn print_col_types(cols: &[SortColumn]) -> String { + let t: Vec<_> = cols + .iter() + .map(|x| x.values.data_type().to_string()) + .collect(); + t.join(",") + } + + #[test] + #[cfg_attr(miri, ignore)] + fn fuzz_test() { + for _ in 0..100 { + let mut rng = thread_rng(); + let num_columns = rng.gen_range(1..5); + let len = rng.gen_range(5..100); + let arrays: Vec<_> = (0..num_columns).map(|_| generate_column(len)).collect(); + + let options: Vec<_> = (0..num_columns) + .map(|_| SortOptions { + descending: rng.gen_bool(0.5), + nulls_first: rng.gen_bool(0.5), + }) + .collect(); + + let sort_columns: Vec<_> = options + .iter() + .zip(&arrays) + .map(|(o, c)| SortColumn { + values: Arc::clone(c), + options: Some(*o), + }) + .collect(); + + let comparator = LexicographicalComparator::try_new(&sort_columns).unwrap(); + + let columns = options + .into_iter() + .zip(&arrays) + .map(|(o, a)| SortField::new_with_options(a.data_type().clone(), o)) + .collect(); + + let mut converter = RowConverter::new(columns).unwrap(); + let rows = converter.convert_columns(&arrays).unwrap(); + + for i in 0..len { + for j in 0..len { + let row_i = rows.row(i); + let row_j = rows.row(j); + let row_cmp = row_i.cmp(&row_j); + let lex_cmp = comparator.compare(i, j); + assert_eq!( + row_cmp, + lex_cmp, + "({:?} vs {:?}) vs ({:?} vs {:?}) for types {}", + print_row(&sort_columns, i), + print_row(&sort_columns, j), + row_i, + row_j, + print_col_types(&sort_columns) + ); + } + } + + let back = converter.convert_rows(&rows).unwrap(); + for (actual, expected) in back.iter().zip(&arrays) { + actual.data().validate_full().unwrap(); + assert_eq!(actual, expected) + } + } + } +} diff --git a/arrow/src/row/list.rs b/arrow-row/src/list.rs similarity index 97% rename from arrow/src/row/list.rs rename to arrow-row/src/list.rs index e5ea5c2a04c..7cc5eb3999d 100644 --- a/arrow/src/row/list.rs +++ b/arrow-row/src/list.rs @@ -15,10 +15,9 @@ // specific language governing permissions and limitations // under the License. -use crate::compute::SortOptions; -use crate::row::{RowConverter, Rows, SortField}; +use crate::{RowConverter, Rows, SortField}; use arrow_array::builder::BufferBuilder; -use arrow_array::{Array, GenericListArray, OffsetSizeTrait}; +use arrow_array::{Array, GenericListArray, OffsetSizeTrait, SortOptions}; use arrow_data::ArrayDataBuilder; use arrow_schema::ArrowError; use std::ops::Range; diff --git a/arrow/src/row/mod.rs b/arrow-row/src/mod.rs similarity index 100% rename from arrow/src/row/mod.rs rename to arrow-row/src/mod.rs diff --git a/arrow/src/row/variable.rs b/arrow-row/src/variable.rs similarity index 97% rename from arrow/src/row/variable.rs rename to arrow-row/src/variable.rs index 9162f231203..93c96e64165 100644 --- a/arrow/src/row/variable.rs +++ b/arrow-row/src/variable.rs @@ -15,11 +15,10 @@ // specific language governing permissions and limitations // under the License. -use crate::compute::SortOptions; -use crate::row::{null_sentinel, Rows}; -use crate::util::bit_util::ceil; +use crate::{null_sentinel, Rows}; use arrow_array::builder::BufferBuilder; -use arrow_array::{Array, GenericBinaryArray, GenericStringArray, OffsetSizeTrait}; +use arrow_array::*; +use arrow_buffer::bit_util::ceil; use arrow_buffer::MutableBuffer; use arrow_data::ArrayDataBuilder; use arrow_schema::DataType; diff --git a/arrow/src/lib.rs b/arrow/src/lib.rs index d57168dc9ea..8611acf52fe 100644 --- a/arrow/src/lib.rs +++ b/arrow/src/lib.rs @@ -328,7 +328,7 @@ pub mod pyarrow; pub mod record_batch { pub use arrow_array::{RecordBatch, RecordBatchOptions, RecordBatchReader}; } -pub mod row; pub use arrow_array::temporal_conversions; +pub use arrow_row as row; pub mod tensor; pub mod util; diff --git a/dev/release/README.md b/dev/release/README.md index 75849641d8b..a18d8a4992c 100644 --- a/dev/release/README.md +++ b/dev/release/README.md @@ -258,13 +258,12 @@ Rust Arrow Crates: (cd arrow-array && cargo publish) (cd arrow-select && cargo publish) (cd arrow-cast && cargo publish) -(cd arrow-string && cargo publish) -(cd arrow-ord && cargo publish) (cd arrow-ipc && cargo publish) (cd arrow-csv && cargo publish) (cd arrow-json && cargo publish) (cd arrow-ord && cargo publish) (cd arrow-string && cargo publish) +(cd arrow-row && cargo publish) (cd arrow && cargo publish) (cd arrow-flight && cargo publish) (cd parquet && cargo publish)