Skip to content

Commit

Permalink
Split out arrow-buffer crate (#2594) (#2693)
Browse files Browse the repository at this point in the history
* Split out arrow-buffer crate (#2594)

* Fix doc

* Review feedback

* Review feedback

* Use 64-bit wide collect_bool
  • Loading branch information
tustvold committed Sep 15, 2022
1 parent 7594db6 commit fb01656
Show file tree
Hide file tree
Showing 25 changed files with 542 additions and 547 deletions.
3 changes: 2 additions & 1 deletion Cargo.toml
Expand Up @@ -18,10 +18,11 @@
[workspace]
members = [
"arrow",
"arrow-buffer",
"arrow-flight",
"parquet",
"parquet_derive",
"parquet_derive_test",
"arrow-flight",
"integration-testing",
"object_store",
]
Expand Down
47 changes: 47 additions & 0 deletions arrow-buffer/Cargo.toml
@@ -0,0 +1,47 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

[package]
name = "arrow-buffer"
version = "22.0.0"
description = "Buffer abstractions for Apache Arrow"
homepage = "https://github.com/apache/arrow-rs"
repository = "https://github.com/apache/arrow-rs"
authors = ["Apache Arrow <dev@arrow.apache.org>"]
license = "Apache-2.0"
keywords = ["arrow"]
include = [
"benches/*.rs",
"src/**/*.rs",
"Cargo.toml",
]
edition = "2021"
rust-version = "1.62"

[lib]
name = "arrow_buffer"
path = "src/lib.rs"
bench = false

[dependencies]
num = { version = "0.4", default-features = false, features = ["std"] }
half = { version = "2.0", default-features = false }

[dev-dependencies]
rand = { version = "0.8", default-features = false, features = ["std", "std_rng"] }

[build-dependencies]
File renamed without changes.
32 changes: 11 additions & 21 deletions arrow/src/alloc/mod.rs → arrow-buffer/src/alloc/mod.rs
Expand Up @@ -20,34 +20,29 @@

use std::alloc::{handle_alloc_error, Layout};
use std::fmt::{Debug, Formatter};
use std::mem::size_of;
use std::panic::RefUnwindSafe;
use std::ptr::NonNull;
use std::sync::Arc;

mod alignment;
mod types;

pub use alignment::ALIGNMENT;
pub use types::NativeType;

#[inline]
unsafe fn null_pointer<T: NativeType>() -> NonNull<T> {
NonNull::new_unchecked(ALIGNMENT as *mut T)
unsafe fn null_pointer() -> NonNull<u8> {
NonNull::new_unchecked(ALIGNMENT as *mut u8)
}

/// Allocates a cache-aligned memory region of `size` bytes with uninitialized values.
/// This is more performant than using [allocate_aligned_zeroed] when all bytes will have
/// an unknown or non-zero value and is semantically similar to `malloc`.
pub fn allocate_aligned<T: NativeType>(size: usize) -> NonNull<T> {
pub fn allocate_aligned(size: usize) -> NonNull<u8> {
unsafe {
if size == 0 {
null_pointer()
} else {
let size = size * size_of::<T>();

let layout = Layout::from_size_align_unchecked(size, ALIGNMENT);
let raw_ptr = std::alloc::alloc(layout) as *mut T;
let raw_ptr = std::alloc::alloc(layout);
NonNull::new(raw_ptr).unwrap_or_else(|| handle_alloc_error(layout))
}
}
Expand All @@ -56,15 +51,13 @@ pub fn allocate_aligned<T: NativeType>(size: usize) -> NonNull<T> {
/// Allocates a cache-aligned memory region of `size` bytes with `0` on all of them.
/// This is more performant than using [allocate_aligned] and setting all bytes to zero
/// and is semantically similar to `calloc`.
pub fn allocate_aligned_zeroed<T: NativeType>(size: usize) -> NonNull<T> {
pub fn allocate_aligned_zeroed(size: usize) -> NonNull<u8> {
unsafe {
if size == 0 {
null_pointer()
} else {
let size = size * size_of::<T>();

let layout = Layout::from_size_align_unchecked(size, ALIGNMENT);
let raw_ptr = std::alloc::alloc_zeroed(layout) as *mut T;
let raw_ptr = std::alloc::alloc_zeroed(layout);
NonNull::new(raw_ptr).unwrap_or_else(|| handle_alloc_error(layout))
}
}
Expand All @@ -78,9 +71,8 @@ pub fn allocate_aligned_zeroed<T: NativeType>(size: usize) -> NonNull<T> {
/// * ptr must denote a block of memory currently allocated via this allocator,
///
/// * size must be the same size that was used to allocate that block of memory,
pub unsafe fn free_aligned<T: NativeType>(ptr: NonNull<T>, size: usize) {
pub unsafe fn free_aligned(ptr: NonNull<u8>, size: usize) {
if ptr != null_pointer() {
let size = size * size_of::<T>();
std::alloc::dealloc(
ptr.as_ptr() as *mut u8,
Layout::from_size_align_unchecked(size, ALIGNMENT),
Expand All @@ -99,13 +91,11 @@ pub unsafe fn free_aligned<T: NativeType>(ptr: NonNull<T>, size: usize) {
///
/// * new_size, when rounded up to the nearest multiple of [ALIGNMENT], must not overflow (i.e.,
/// the rounded value must be less than usize::MAX).
pub unsafe fn reallocate<T: NativeType>(
ptr: NonNull<T>,
pub unsafe fn reallocate(
ptr: NonNull<u8>,
old_size: usize,
new_size: usize,
) -> NonNull<T> {
let old_size = old_size * size_of::<T>();
let new_size = new_size * size_of::<T>();
) -> NonNull<u8> {
if ptr == null_pointer() {
return allocate_aligned(new_size);
}
Expand All @@ -119,7 +109,7 @@ pub unsafe fn reallocate<T: NativeType>(
ptr.as_ptr() as *mut u8,
Layout::from_size_align_unchecked(old_size, ALIGNMENT),
new_size,
) as *mut T;
);
NonNull::new(raw_ptr).unwrap_or_else(|| {
handle_alloc_error(Layout::from_size_align_unchecked(new_size, ALIGNMENT))
})
Expand Down
Expand Up @@ -23,7 +23,7 @@ use std::{convert::AsRef, usize};

use crate::alloc::{Allocation, Deallocation};
use crate::util::bit_chunk_iterator::{BitChunks, UnalignedBitChunk};
use crate::{bytes::Bytes, datatypes::ArrowNativeType};
use crate::{bytes::Bytes, native::ArrowNativeType};

use super::ops::bitwise_unary_op_helper;
use super::MutableBuffer;
Expand Down Expand Up @@ -271,7 +271,7 @@ impl Buffer {
/// Prefer this to `collect` whenever possible, as it is ~60% faster.
/// # Example
/// ```
/// # use arrow::buffer::Buffer;
/// # use arrow_buffer::buffer::Buffer;
/// let v = vec![1u32];
/// let iter = v.iter().map(|x| x * 2);
/// let buffer = unsafe { Buffer::from_trusted_len_iter(iter) };
Expand Down
29 changes: 29 additions & 0 deletions arrow-buffer/src/buffer/mod.rs
@@ -0,0 +1,29 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! This module contains two main structs: [Buffer] and [MutableBuffer]. A buffer represents
//! a contiguous memory region that can be shared via `offsets`.

mod immutable;
pub use immutable::*;
mod mutable;
pub use mutable::*;
mod ops;
mod scalar;
pub use scalar::*;

pub use ops::*;
45 changes: 23 additions & 22 deletions arrow/src/buffer/mutable.rs → arrow-buffer/src/buffer/mutable.rs
Expand Up @@ -20,7 +20,7 @@ use crate::alloc::Deallocation;
use crate::{
alloc,
bytes::Bytes,
datatypes::{ArrowNativeType, ToByteSlice},
native::{ArrowNativeType, ToByteSlice},
util::bit_util,
};
use std::ptr::NonNull;
Expand All @@ -31,12 +31,12 @@ use std::ptr::NonNull;
/// Use [MutableBuffer::push] to insert an item, [MutableBuffer::extend_from_slice]
/// to insert many items, and `into` to convert it to [`Buffer`].
///
/// For a safe, strongly typed API consider using [`crate::array::BufferBuilder`]
/// For a safe, strongly typed API consider using `arrow::array::BufferBuilder`
///
/// # Example
///
/// ```
/// # use arrow::buffer::{Buffer, MutableBuffer};
/// # use arrow_buffer::buffer::{Buffer, MutableBuffer};
/// let mut buffer = MutableBuffer::new(0);
/// buffer.push(256u32);
/// buffer.extend_from_slice(&[1u32]);
Expand Down Expand Up @@ -75,7 +75,7 @@ impl MutableBuffer {
/// all bytes are guaranteed to be `0u8`.
/// # Example
/// ```
/// # use arrow::buffer::{Buffer, MutableBuffer};
/// # use arrow_buffer::buffer::{Buffer, MutableBuffer};
/// let mut buffer = MutableBuffer::from_len_zeroed(127);
/// assert_eq!(buffer.len(), 127);
/// assert!(buffer.capacity() >= 127);
Expand Down Expand Up @@ -131,7 +131,7 @@ impl MutableBuffer {
/// `self.len + additional > capacity`.
/// # Example
/// ```
/// # use arrow::buffer::{Buffer, MutableBuffer};
/// # use arrow_buffer::buffer::{Buffer, MutableBuffer};
/// let mut buffer = MutableBuffer::new(0);
/// buffer.reserve(253); // allocates for the first time
/// (0..253u8).for_each(|i| buffer.push(i)); // no reallocation
Expand Down Expand Up @@ -171,7 +171,7 @@ impl MutableBuffer {
/// growing it (potentially reallocating it) and writing `value` in the newly available bytes.
/// # Example
/// ```
/// # use arrow::buffer::{Buffer, MutableBuffer};
/// # use arrow_buffer::buffer::{Buffer, MutableBuffer};
/// let mut buffer = MutableBuffer::new(0);
/// buffer.resize(253, 2); // allocates for the first time
/// assert_eq!(buffer.as_slice()[252], 2u8);
Expand All @@ -195,7 +195,7 @@ impl MutableBuffer {
///
/// # Example
/// ```
/// # use arrow::buffer::{Buffer, MutableBuffer};
/// # use arrow_buffer::buffer::{Buffer, MutableBuffer};
/// // 2 cache lines
/// let mut buffer = MutableBuffer::new(128);
/// assert_eq!(buffer.capacity(), 128);
Expand Down Expand Up @@ -322,7 +322,7 @@ impl MutableBuffer {
/// Extends this buffer from a slice of items that can be represented in bytes, increasing its capacity if needed.
/// # Example
/// ```
/// # use arrow::buffer::MutableBuffer;
/// # use arrow_buffer::buffer::MutableBuffer;
/// let mut buffer = MutableBuffer::new(0);
/// buffer.extend_from_slice(&[2u32, 0]);
/// assert_eq!(buffer.len(), 8) // u32 has 4 bytes
Expand All @@ -346,7 +346,7 @@ impl MutableBuffer {
/// Extends the buffer with a new item, increasing its capacity if needed.
/// # Example
/// ```
/// # use arrow::buffer::MutableBuffer;
/// # use arrow_buffer::buffer::MutableBuffer;
/// let mut buffer = MutableBuffer::new(0);
/// buffer.push(256u32);
/// assert_eq!(buffer.len(), 4) // u32 has 4 bytes
Expand Down Expand Up @@ -384,7 +384,7 @@ impl MutableBuffer {
/// # Safety
/// The caller must ensure that the buffer was properly initialized up to `len`.
#[inline]
pub(crate) unsafe fn set_len(&mut self, len: usize) {
pub unsafe fn set_len(&mut self, len: usize) {
assert!(len <= self.capacity());
self.len = len;
}
Expand All @@ -394,16 +394,16 @@ impl MutableBuffer {
/// This is similar to `from_trusted_len_iter_bool`, however, can be significantly faster
/// as it eliminates the conditional `Iterator::next`
#[inline]
pub(crate) fn collect_bool<F: FnMut(usize) -> bool>(len: usize, mut f: F) -> Self {
let mut buffer = Self::new(bit_util::ceil(len, 8));
pub fn collect_bool<F: FnMut(usize) -> bool>(len: usize, mut f: F) -> Self {
let mut buffer = Self::new(bit_util::ceil(len, 64) * 8);

let chunks = len / 8;
let remainder = len % 8;
let chunks = len / 64;
let remainder = len % 64;
for chunk in 0..chunks {
let mut packed = 0;
for bit_idx in 0..8 {
let i = bit_idx + chunk * 8;
packed |= (f(i) as u8) << bit_idx;
for bit_idx in 0..64 {
let i = bit_idx + chunk * 64;
packed |= (f(i) as u64) << bit_idx;
}

// SAFETY: Already allocated sufficient capacity
Expand All @@ -413,14 +413,15 @@ impl MutableBuffer {
if remainder != 0 {
let mut packed = 0;
for bit_idx in 0..remainder {
let i = bit_idx + chunks * 8;
packed |= (f(i) as u8) << bit_idx;
let i = bit_idx + chunks * 64;
packed |= (f(i) as u64) << bit_idx;
}

// SAFETY: Already allocated sufficient capacity
unsafe { buffer.push_unchecked(packed) }
}

buffer.truncate(bit_util::ceil(len, 8));
buffer
}
}
Expand Down Expand Up @@ -484,7 +485,7 @@ impl MutableBuffer {
/// Prefer this to `collect` whenever possible, as it is faster ~60% faster.
/// # Example
/// ```
/// # use arrow::buffer::MutableBuffer;
/// # use arrow_buffer::buffer::MutableBuffer;
/// let v = vec![1u32];
/// let iter = v.iter().map(|x| x * 2);
/// let buffer = unsafe { MutableBuffer::from_trusted_len_iter(iter) };
Expand Down Expand Up @@ -525,10 +526,10 @@ impl MutableBuffer {
}

/// Creates a [`MutableBuffer`] from a boolean [`Iterator`] with a trusted (upper) length.
/// # use arrow::buffer::MutableBuffer;
/// # use arrow_buffer::buffer::MutableBuffer;
/// # Example
/// ```
/// # use arrow::buffer::MutableBuffer;
/// # use arrow_buffer::buffer::MutableBuffer;
/// let v = vec![false, true, false];
/// let iter = v.iter().map(|x| *x || true);
/// let buffer = unsafe { MutableBuffer::from_trusted_len_iter_bool(iter) };
Expand Down
21 changes: 7 additions & 14 deletions arrow/src/buffer/ops.rs → arrow-buffer/src/buffer/ops.rs
Expand Up @@ -20,26 +20,19 @@ use crate::util::bit_util::ceil;

/// Apply a bitwise operation `op` to four inputs and return the result as a Buffer.
/// The inputs are treated as bitmaps, meaning that offsets and length are specified in number of bits.
#[allow(clippy::too_many_arguments)]
pub(crate) fn bitwise_quaternary_op_helper<F>(
first: &Buffer,
first_offset_in_bits: usize,
second: &Buffer,
second_offset_in_bits: usize,
third: &Buffer,
third_offset_in_bits: usize,
fourth: &Buffer,
fourth_offset_in_bits: usize,
pub fn bitwise_quaternary_op_helper<F>(
buffers: [&Buffer; 4],
offsets: [usize; 4],
len_in_bits: usize,
op: F,
) -> Buffer
where
F: Fn(u64, u64, u64, u64) -> u64,
{
let first_chunks = first.bit_chunks(first_offset_in_bits, len_in_bits);
let second_chunks = second.bit_chunks(second_offset_in_bits, len_in_bits);
let third_chunks = third.bit_chunks(third_offset_in_bits, len_in_bits);
let fourth_chunks = fourth.bit_chunks(fourth_offset_in_bits, len_in_bits);
let first_chunks = buffers[0].bit_chunks(offsets[0], len_in_bits);
let second_chunks = buffers[1].bit_chunks(offsets[1], len_in_bits);
let third_chunks = buffers[2].bit_chunks(offsets[2], len_in_bits);
let fourth_chunks = buffers[3].bit_chunks(offsets[3], len_in_bits);

let chunks = first_chunks
.iter()
Expand Down
Expand Up @@ -16,7 +16,7 @@
// under the License.

use crate::buffer::Buffer;
use crate::datatypes::ArrowNativeType;
use crate::native::ArrowNativeType;
use std::ops::Deref;

/// Provides a safe API for interpreting a [`Buffer`] as a slice of [`ArrowNativeType`]
Expand Down

0 comments on commit fb01656

Please sign in to comment.