Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Added comparable row-oriented representation of a collection of [`Arr…
Browse files Browse the repository at this point in the history
…ay`]. (#1287)
  • Loading branch information
RinChanNOWWW committed Nov 4, 2022
1 parent 5bd0c7a commit 562de6a
Show file tree
Hide file tree
Showing 12 changed files with 2,003 additions and 2 deletions.
7 changes: 6 additions & 1 deletion Cargo.toml
Expand Up @@ -32,6 +32,8 @@ hash_hasher = "^2.0.3"
simdutf8 = "0.1.3"
# faster hashing
ahash = { version = "0.7" }
# A Rust port of SwissTable
hashbrown = { version = "0.12", default-features = false, optional = true }

# for timezone support
chrono-tz = { version = "0.6", optional = true }
Expand Down Expand Up @@ -207,7 +209,7 @@ compute_merge_sort = ["itertools", "compute_sort"]
compute_nullif = ["compute_comparison"]
compute_partition = ["compute_sort"]
compute_regex_match = ["regex"]
compute_sort = ["compute_take"]
compute_sort = ["compute_take", "hashbrown"]
compute_substring = []
compute_take = []
compute_temporal = []
Expand Down Expand Up @@ -244,6 +246,9 @@ benchmarks = ["rand"]
serde_types = ["serde", "serde_derive"]
simd = []

[build-dependencies]
rustc_version = "0.4.0"

[package.metadata.cargo-all-features]
allowlist = ["compute", "compute_sort", "compute_hash", "compute_nullif"]

Expand Down
7 changes: 7 additions & 0 deletions build.rs
@@ -0,0 +1,7 @@
use rustc_version::{version_meta, Channel};

fn main() {
if version_meta().unwrap().channel == Channel::Nightly {
println!("cargo:rustc-cfg=nightly_build");
}
}
3 changes: 2 additions & 1 deletion src/compute/sort/mod.rs
Expand Up @@ -17,6 +17,7 @@ mod lex_sort;
mod primitive;
mod utf8;

pub mod row;
pub(crate) use lex_sort::build_compare;
pub use lex_sort::{lexsort, lexsort_to_indices, lexsort_to_indices_impl, SortColumn};

Expand Down Expand Up @@ -290,7 +291,7 @@ pub fn can_sort(data_type: &DataType) -> bool {
}

/// Options that define how sort kernels should behave
#[derive(Clone, Copy, Debug)]
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub struct SortOptions {
/// Whether to sort in descending order
pub descending: bool,
Expand Down
120 changes: 120 additions & 0 deletions src/compute/sort/row/dictionary.rs
@@ -0,0 +1,120 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use crate::{
array::{Array, BinaryArray, DictionaryArray, DictionaryKey, PrimitiveArray, Utf8Array},
compute::sort::SortOptions,
datatypes::PhysicalType,
error::*,
with_match_primitive_without_interval_type,
};

use super::{
fixed::FixedLengthEncoding,
interner::{Interned, OrderPreservingInterner},
null_sentinel, Rows,
};

/// Computes the dictionary mapping for the given dictionary values
pub fn compute_dictionary_mapping(
interner: &mut OrderPreservingInterner,
values: &Box<dyn Array>,
) -> Result<Vec<Option<Interned>>> {
Ok(match values.data_type().to_physical_type() {
PhysicalType::Primitive(primitive) => {
with_match_primitive_without_interval_type!(primitive, |$T| {
let values = values
.as_any()
.downcast_ref::<PrimitiveArray<$T>>()
.unwrap();
interner.intern(values.iter().map(|x| x.map(|x| x.encode())))
})
}
PhysicalType::Binary => {
let iter = values
.as_any()
.downcast_ref::<BinaryArray<i32>>()
.unwrap()
.iter();
interner.intern(iter)
}
PhysicalType::LargeBinary => {
let iter = values
.as_any()
.downcast_ref::<BinaryArray<i64>>()
.unwrap()
.iter();
interner.intern(iter)
}
PhysicalType::Utf8 => {
let iter = values
.as_any()
.downcast_ref::<Utf8Array<i32>>()
.unwrap()
.iter()
.map(|x| x.map(|x| x.as_bytes()));
interner.intern(iter)
}
PhysicalType::LargeUtf8 => {
let iter = values
.as_any()
.downcast_ref::<Utf8Array<i64>>()
.unwrap()
.iter()
.map(|x| x.map(|x| x.as_bytes()));
interner.intern(iter)
}
t => {
return Err(Error::NotYetImplemented(format!(
"dictionary value {:?} is not supported",
t
)))
}
})
}

/// Dictionary types are encoded as
///
/// - single `0_u8` if null
/// - the bytes of the corresponding normalized key including the null terminator
pub fn encode_dictionary<K: DictionaryKey>(
out: &mut Rows,
column: &DictionaryArray<K>,
normalized_keys: &[Option<&[u8]>],
opts: SortOptions,
) {
for (offset, k) in out.offsets.iter_mut().skip(1).zip(column.keys()) {
match k.and_then(|k| normalized_keys[unsafe { k.as_usize() }]) {
Some(normalized_key) => {
let end_offset = *offset + 1 + normalized_key.len();
out.buffer[*offset] = 1;
out.buffer[*offset + 1..end_offset].copy_from_slice(normalized_key);
// Negate if descending
if opts.descending {
out.buffer[*offset..end_offset]
.iter_mut()
.for_each(|v| *v = !*v)
}
*offset = end_offset;
}
None => {
out.buffer[*offset] = null_sentinel(opts);
*offset += 1;
}
}
}
}
198 changes: 198 additions & 0 deletions src/compute/sort/row/fixed.rs
@@ -0,0 +1,198 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use crate::{
array::PrimitiveArray,
compute::sort::SortOptions,
types::{f16, i256, NativeType},
};

use super::{null_sentinel, Rows};

pub trait FromSlice {
fn from_slice(slice: &[u8], invert: bool) -> Self;
}

impl<const N: usize> FromSlice for [u8; N] {
#[inline]
fn from_slice(slice: &[u8], invert: bool) -> Self {
let mut t: Self = slice.try_into().unwrap();
if invert {
t.iter_mut().for_each(|o| *o = !*o);
}
t
}
}

/// Encodes a value of a particular fixed width type into bytes according to the rules
/// described on [`super::RowConverter`]
pub trait FixedLengthEncoding: Copy {
const ENCODED_LEN: usize = 1 + std::mem::size_of::<Self::Encoded>();

type Encoded: Sized + Copy + FromSlice + AsRef<[u8]> + AsMut<[u8]>;

fn encode(self) -> Self::Encoded;

fn decode(encoded: Self::Encoded) -> Self;
}

impl FixedLengthEncoding for bool {
type Encoded = [u8; 1];

fn encode(self) -> [u8; 1] {
[self as u8]
}

fn decode(encoded: Self::Encoded) -> Self {
encoded[0] != 0
}
}

macro_rules! encode_signed {
($n:expr, $t:ty) => {
impl FixedLengthEncoding for $t {
type Encoded = [u8; $n];

fn encode(self) -> [u8; $n] {
let mut b = self.to_be_bytes();
// Toggle top "sign" bit to ensure consistent sort order
b[0] ^= 0x80;
b
}

fn decode(mut encoded: Self::Encoded) -> Self {
// Toggle top "sign" bit
encoded[0] ^= 0x80;
Self::from_be_bytes(encoded)
}
}
};
}

encode_signed!(1, i8);
encode_signed!(2, i16);
encode_signed!(4, i32);
encode_signed!(8, i64);
encode_signed!(16, i128);
encode_signed!(32, i256);

macro_rules! encode_unsigned {
($n:expr, $t:ty) => {
impl FixedLengthEncoding for $t {
type Encoded = [u8; $n];

fn encode(self) -> [u8; $n] {
self.to_be_bytes()
}

fn decode(encoded: Self::Encoded) -> Self {
Self::from_be_bytes(encoded)
}
}
};
}

encode_unsigned!(1, u8);
encode_unsigned!(2, u16);
encode_unsigned!(4, u32);
encode_unsigned!(8, u64);

impl FixedLengthEncoding for f16 {
type Encoded = [u8; 2];

fn encode(self) -> [u8; 2] {
// https://github.com/rust-lang/rust/blob/9c20b2a8cc7588decb6de25ac6a7912dcef24d65/library/core/src/num/f32.rs#L1176-L1260
let s = self.to_bits() as i16;
let val = s ^ (((s >> 15) as u16) >> 1) as i16;
val.encode()
}

fn decode(encoded: Self::Encoded) -> Self {
let bits = i16::decode(encoded);
let val = bits ^ (((bits >> 15) as u16) >> 1) as i16;
Self::from_bits(val as u16)
}
}

impl FixedLengthEncoding for f32 {
type Encoded = [u8; 4];

fn encode(self) -> [u8; 4] {
// https://github.com/rust-lang/rust/blob/9c20b2a8cc7588decb6de25ac6a7912dcef24d65/library/core/src/num/f32.rs#L1176-L1260
let s = self.to_bits() as i32;
let val = s ^ (((s >> 31) as u32) >> 1) as i32;
val.encode()
}

fn decode(encoded: Self::Encoded) -> Self {
let bits = i32::decode(encoded);
let val = bits ^ (((bits >> 31) as u32) >> 1) as i32;
Self::from_bits(val as u32)
}
}

impl FixedLengthEncoding for f64 {
type Encoded = [u8; 8];

fn encode(self) -> [u8; 8] {
// https://github.com/rust-lang/rust/blob/9c20b2a8cc7588decb6de25ac6a7912dcef24d65/library/core/src/num/f32.rs#L1176-L1260
let s = self.to_bits() as i64;
let val = s ^ (((s >> 63) as u64) >> 1) as i64;
val.encode()
}

fn decode(encoded: Self::Encoded) -> Self {
let bits = i64::decode(encoded);
let val = bits ^ (((bits >> 63) as u64) >> 1) as i64;
Self::from_bits(val as u64)
}
}

/// Returns the total encoded length (including null byte) for a value of type `T::Native`
pub const fn encoded_len<T>(_col: &PrimitiveArray<T>) -> usize
where
T: NativeType + FixedLengthEncoding,
{
T::ENCODED_LEN
}

/// Fixed width types are encoded as
///
/// - 1 byte `0` if null or `1` if valid
/// - bytes of [`FixedLengthEncoding`]
pub fn encode<T: FixedLengthEncoding, I: IntoIterator<Item = Option<T>>>(
out: &mut Rows,
i: I,
opts: SortOptions,
) {
for (offset, maybe_val) in out.offsets.iter_mut().skip(1).zip(i) {
let end_offset = *offset + T::ENCODED_LEN;
if let Some(val) = maybe_val {
let to_write = &mut out.buffer[*offset..end_offset];
to_write[0] = 1;
let mut encoded = val.encode();
if opts.descending {
// Flip bits to reverse order
encoded.as_mut().iter_mut().for_each(|v| *v = !*v)
}
to_write[1..].copy_from_slice(encoded.as_ref())
} else {
out.buffer[*offset] = null_sentinel(opts);
}
*offset = end_offset;
}
}

0 comments on commit 562de6a

Please sign in to comment.