Skip to content

Commit

Permalink
Comparable Row Format (#2593)
Browse files Browse the repository at this point in the history
* Add row format

* Skip miri on heavier tests

* Handle nulls in dictionary values

* Don't fuzz test dictionaries with null values

* Add docs

* Add error plumbing

* Review feedback

* Fix docs
  • Loading branch information
tustvold committed Sep 10, 2022
1 parent 41e0187 commit a1d24e4
Show file tree
Hide file tree
Showing 8 changed files with 1,734 additions and 3 deletions.
5 changes: 5 additions & 0 deletions arrow/Cargo.toml
Expand Up @@ -233,3 +233,8 @@ harness = false
[[bench]]
name = "decimal_validate"
harness = false

[[bench]]
name = "row_format"
harness = false
required-features = ["test_utils"]
114 changes: 114 additions & 0 deletions arrow/benches/row_format.rs
@@ -0,0 +1,114 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#[macro_use]
extern crate criterion;
extern crate core;

use arrow::array::ArrayRef;
use arrow::datatypes::{DataType, Int64Type, UInt64Type};
use arrow::row::{RowConverter, SortField};
use arrow::util::bench_util::{create_primitive_array, create_string_array_with_len};
use criterion::{black_box, Criterion};
use std::sync::Arc;

fn row_bench(c: &mut Criterion) {
let cols = vec![Arc::new(create_primitive_array::<UInt64Type>(4096, 0.)) as ArrayRef];

c.bench_function("row_batch 4096 u64(0)", |b| {
b.iter(|| {
let mut converter = RowConverter::new(vec![SortField::new(DataType::UInt64)]);
black_box(converter.convert_columns(&cols))
});
});

let cols = vec![Arc::new(create_primitive_array::<Int64Type>(4096, 0.)) as ArrayRef];

c.bench_function("row_batch 4096 i64(0)", |b| {
b.iter(|| {
let mut converter = RowConverter::new(vec![SortField::new(DataType::Int64)]);
black_box(converter.convert_columns(&cols))
});
});

let cols =
vec![Arc::new(create_string_array_with_len::<i32>(4096, 0., 10)) as ArrayRef];

c.bench_function("row_batch 4096 string(10, 0)", |b| {
b.iter(|| {
let mut converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]);
black_box(converter.convert_columns(&cols))
});
});

let cols =
vec![Arc::new(create_string_array_with_len::<i32>(4096, 0., 30)) as ArrayRef];

c.bench_function("row_batch 4096 string(30, 0)", |b| {
b.iter(|| {
let mut converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]);
black_box(converter.convert_columns(&cols))
});
});

let cols =
vec![Arc::new(create_string_array_with_len::<i32>(4096, 0., 100)) as ArrayRef];

c.bench_function("row_batch 4096 string(100, 0)", |b| {
b.iter(|| {
let mut converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]);
black_box(converter.convert_columns(&cols))
});
});

let cols =
vec![Arc::new(create_string_array_with_len::<i32>(4096, 0.5, 100)) as ArrayRef];

c.bench_function("row_batch 4096 string(100, 0.5)", |b| {
b.iter(|| {
let mut converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]);
black_box(converter.convert_columns(&cols))
});
});

let cols = [
Arc::new(create_string_array_with_len::<i32>(4096, 0.5, 20)) as ArrayRef,
Arc::new(create_string_array_with_len::<i32>(4096, 0., 30)) as ArrayRef,
Arc::new(create_string_array_with_len::<i32>(4096, 0., 100)) as ArrayRef,
Arc::new(create_primitive_array::<Int64Type>(4096, 0.)) as ArrayRef,
];

let fields = [
SortField::new(DataType::Utf8),
SortField::new(DataType::Utf8),
SortField::new(DataType::Utf8),
SortField::new(DataType::Int64),
];

c.bench_function(
"row_batch 4096 string(20, 0.5), string(30, 0), string(100, 0), i64(0)",
|b| {
b.iter(|| {
let mut converter = RowConverter::new(fields.to_vec());
black_box(converter.convert_columns(&cols))
});
},
);
}

criterion_group!(benches, row_bench);
criterion_main!(benches);
6 changes: 3 additions & 3 deletions arrow/src/compute/kernels/sort.rs
Expand Up @@ -1071,13 +1071,13 @@ type LexicographicalCompareItem<'a> = (

/// A lexicographical comparator that wraps given array data (columns) and can lexicographically compare data
/// at given two indices. The lifetime is the same at the data wrapped.
pub(super) struct LexicographicalComparator<'a> {
pub(crate) struct LexicographicalComparator<'a> {
compare_items: Vec<LexicographicalCompareItem<'a>>,
}

impl LexicographicalComparator<'_> {
/// lexicographically compare values at the wrapped columns with given indices.
pub(super) fn compare<'a, 'b>(
pub(crate) fn compare<'a, 'b>(
&'a self,
a_idx: &'b usize,
b_idx: &'b usize,
Expand Down Expand Up @@ -1121,7 +1121,7 @@ impl LexicographicalComparator<'_> {

/// Create a new lex comparator that will wrap the given sort columns and give comparison
/// results with two indices.
pub(super) fn try_new(
pub(crate) fn try_new(
columns: &[SortColumn],
) -> Result<LexicographicalComparator<'_>> {
let compare_items = columns
Expand Down
1 change: 1 addition & 0 deletions arrow/src/lib.rs
Expand Up @@ -269,6 +269,7 @@ pub mod json;
#[cfg(feature = "pyarrow")]
pub mod pyarrow;
pub mod record_batch;
pub mod row;
pub mod temporal_conversions;
pub mod tensor;
pub mod util;
160 changes: 160 additions & 0 deletions arrow/src/row/fixed.rs
@@ -0,0 +1,160 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use crate::array::PrimitiveArray;
use crate::compute::SortOptions;
use crate::datatypes::ArrowPrimitiveType;
use crate::row::Rows;
use crate::util::decimal::{Decimal128, Decimal256};
use half::f16;

/// Encodes a value of a particular fixed width type into bytes according to the rules
/// described on [`super::RowConverter`]
pub trait FixedLengthEncoding<const N: usize>: Copy {
const ENCODED_LEN: usize = 1 + N;

fn encode(self) -> [u8; N];
}

impl FixedLengthEncoding<1> for bool {
fn encode(self) -> [u8; 1] {
[self as u8]
}
}

macro_rules! encode_signed {
($n:expr, $t:ty) => {
impl FixedLengthEncoding<$n> for $t {
fn encode(self) -> [u8; $n] {
let mut b = self.to_be_bytes();
// Toggle top "sign" bit to ensure consistent sort order
b[0] ^= 0x80;
b
}
}
};
}

encode_signed!(1, i8);
encode_signed!(2, i16);
encode_signed!(4, i32);
encode_signed!(8, i64);
encode_signed!(16, i128);

macro_rules! encode_unsigned {
($n:expr, $t:ty) => {
impl FixedLengthEncoding<$n> for $t {
fn encode(self) -> [u8; $n] {
self.to_be_bytes()
}
}
};
}

encode_unsigned!(1, u8);
encode_unsigned!(2, u16);
encode_unsigned!(4, u32);
encode_unsigned!(8, u64);

impl FixedLengthEncoding<2> for f16 {
fn encode(self) -> [u8; 2] {
// https://github.com/rust-lang/rust/blob/9c20b2a8cc7588decb6de25ac6a7912dcef24d65/library/core/src/num/f32.rs#L1176-L1260
let s = self.to_bits() as i16;
let val = s ^ (((s >> 15) as u16) >> 1) as i16;
val.encode()
}
}

impl FixedLengthEncoding<4> for f32 {
fn encode(self) -> [u8; 4] {
// https://github.com/rust-lang/rust/blob/9c20b2a8cc7588decb6de25ac6a7912dcef24d65/library/core/src/num/f32.rs#L1176-L1260
let s = self.to_bits() as i32;
let val = s ^ (((s >> 31) as u32) >> 1) as i32;
val.encode()
}
}

impl FixedLengthEncoding<8> for f64 {
fn encode(self) -> [u8; 8] {
// https://github.com/rust-lang/rust/blob/9c20b2a8cc7588decb6de25ac6a7912dcef24d65/library/core/src/num/f32.rs#L1176-L1260
let s = self.to_bits() as i64;
let val = s ^ (((s >> 63) as u64) >> 1) as i64;
val.encode()
}
}

impl FixedLengthEncoding<16> for Decimal128 {
fn encode(self) -> [u8; 16] {
let mut val = *self.raw_value();
// Convert to big endian representation
val.reverse();
// Toggle top "sign" bit to ensure consistent sort order
val[0] ^= 0x80;
val
}
}

impl FixedLengthEncoding<32> for Decimal256 {
fn encode(self) -> [u8; 32] {
let mut val = *self.raw_value();
// Convert to big endian representation
val.reverse();
// Toggle top "sign" bit to ensure consistent sort order
val[0] ^= 0x80;
val
}
}

/// Returns the total encoded length (including null byte) for a value of type `T::Native`
pub const fn encoded_len<const N: usize, T>(_col: &PrimitiveArray<T>) -> usize
where
T: ArrowPrimitiveType,
T::Native: FixedLengthEncoding<N>,
{
T::Native::ENCODED_LEN
}

/// Fixed width types are encoded as
///
/// - 1 byte `0` if null or `1` if valid
/// - bytes of [`FixedLengthEncoding`]
pub fn encode<
const N: usize,
T: FixedLengthEncoding<N>,
I: IntoIterator<Item = Option<T>>,
>(
out: &mut Rows,
i: I,
opts: SortOptions,
) {
for (offset, maybe_val) in out.offsets.iter_mut().skip(1).zip(i) {
let end_offset = *offset + N + 1;
if let Some(val) = maybe_val {
let to_write = &mut out.buffer[*offset..end_offset];
to_write[0] = 1;
let mut encoded = val.encode();
if opts.descending {
// Flip bits to reverse order
encoded.iter_mut().for_each(|v| *v = !*v)
}
to_write[1..].copy_from_slice(&encoded)
} else if !opts.nulls_first {
out.buffer[*offset] = 0xFF;
}
*offset = end_offset;
}
}

0 comments on commit a1d24e4

Please sign in to comment.