Skip to content

Commit

Permalink
Add COW conversion for Buffer and PrimitiveArray and unary_mut (apach…
Browse files Browse the repository at this point in the history
…e#3115)

* Add some APIs for copy-on-write support

* Update

* Add unary_mut as an example

* For review

* For review

* For review

* Fix test and more for review

* Add test on sliced array

* Address an overlooking review.

* For review
  • Loading branch information
viirya committed Nov 17, 2022
1 parent 2a065be commit 5bce104
Show file tree
Hide file tree
Showing 9 changed files with 277 additions and 2 deletions.
12 changes: 12 additions & 0 deletions arrow-array/src/array/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -862,6 +862,7 @@ where
#[cfg(test)]
mod tests {
use super::*;
use crate::cast::downcast_array;
use arrow_schema::Field;

#[test]
Expand Down Expand Up @@ -1113,4 +1114,15 @@ mod tests {
assert!(compute_my_thing(&arr));
assert!(compute_my_thing(arr.as_ref()));
}

#[test]
fn test_downcast_array() {
let array: Int32Array = vec![1, 2, 3].into_iter().map(Some).collect();

let boxed: ArrayRef = Arc::new(array);
let array: Int32Array = downcast_array(&boxed);

let expected: Int32Array = vec![1, 2, 3].into_iter().map(Some).collect();
assert_eq!(array, expected);
}
}
161 changes: 160 additions & 1 deletion arrow-array/src/array/primitive_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -397,6 +397,36 @@ impl<T: ArrowPrimitiveType> PrimitiveArray<T> {
unsafe { build_primitive_array(len, buffer, null_count, null_buffer) }
}

/// Applies an unary and infallible function to a mutable primitive array.
/// Mutable primitive array means that the buffer is not shared with other arrays.
/// As a result, this mutates the buffer directly without allocating new buffer.
///
/// # Implementation
///
/// This will apply the function for all values, including those on null slots.
/// This implies that the operation must be infallible for any value of the corresponding type
/// or this function may panic.
/// # Example
/// ```rust
/// # use arrow_array::{Int32Array, types::Int32Type};
/// # fn main() {
/// let array = Int32Array::from(vec![Some(5), Some(7), None]);
/// let c = array.unary_mut(|x| x * 2 + 1).unwrap();
/// assert_eq!(c, Int32Array::from(vec![Some(11), Some(15), None]));
/// # }
/// ```
pub fn unary_mut<F>(self, op: F) -> Result<PrimitiveArray<T>, PrimitiveArray<T>>
where
F: Fn(T::Native) -> T::Native,
{
let mut builder = self.into_builder()?;
builder
.values_slice_mut()
.iter_mut()
.for_each(|v| *v = op(*v));
Ok(builder.finish())
}

/// Applies a unary and fallible function to all valid values in a primitive array
///
/// This is unlike [`Self::unary`] which will apply an infallible function to all rows
Expand Down Expand Up @@ -489,6 +519,66 @@ impl<T: ArrowPrimitiveType> PrimitiveArray<T> {
)
}
}

/// Returns `PrimitiveBuilder` of this primitive array for mutating its values if the underlying
/// data buffer is not shared by others.
pub fn into_builder(self) -> Result<PrimitiveBuilder<T>, Self> {
let len = self.len();
let null_bit_buffer = self
.data
.null_buffer()
.map(|b| b.bit_slice(self.data.offset(), len));

let element_len = std::mem::size_of::<T::Native>();
let buffer = self.data.buffers()[0]
.slice_with_length(self.data.offset() * element_len, len * element_len);

drop(self.data);

let try_mutable_null_buffer = match null_bit_buffer {
None => Ok(None),
Some(null_buffer) => {
// Null buffer exists, tries to make it mutable
null_buffer.into_mutable().map(Some)
}
};

let try_mutable_buffers = match try_mutable_null_buffer {
Ok(mutable_null_buffer) => {
// Got mutable null buffer, tries to get mutable value buffer
let try_mutable_buffer = buffer.into_mutable();

// try_mutable_buffer.map(...).map_err(...) doesn't work as the compiler complains
// mutable_null_buffer is moved into map closure.
match try_mutable_buffer {
Ok(mutable_buffer) => Ok(PrimitiveBuilder::<T>::new_from_buffer(
mutable_buffer,
mutable_null_buffer,
)),
Err(buffer) => Err((buffer, mutable_null_buffer.map(|b| b.into()))),
}
}
Err(mutable_null_buffer) => {
// Unable to get mutable null buffer
Err((buffer, Some(mutable_null_buffer)))
}
};

match try_mutable_buffers {
Ok(builder) => Ok(builder),
Err((buffer, null_bit_buffer)) => {
let builder = ArrayData::builder(T::DATA_TYPE)
.len(len)
.add_buffer(buffer)
.null_bit_buffer(null_bit_buffer);

let array_data = unsafe { builder.build_unchecked() };
let array = PrimitiveArray::<T>::from(array_data);

Err(array)
}
}
}
}

#[inline]
Expand Down Expand Up @@ -1036,7 +1126,9 @@ impl<T: DecimalType + ArrowPrimitiveType> PrimitiveArray<T> {
mod tests {
use super::*;
use crate::builder::{Decimal128Builder, Decimal256Builder};
use crate::BooleanArray;
use crate::cast::downcast_array;
use crate::{ArrayRef, BooleanArray};
use std::sync::Arc;

#[test]
fn test_primitive_array_from_vec() {
Expand Down Expand Up @@ -1939,4 +2031,71 @@ mod tests {

array.value(4);
}

#[test]
fn test_into_builder() {
let array: Int32Array = vec![1, 2, 3].into_iter().map(Some).collect();

let boxed: ArrayRef = Arc::new(array);
let col: Int32Array = downcast_array(&boxed);
drop(boxed);

let mut builder = col.into_builder().unwrap();

let slice = builder.values_slice_mut();
assert_eq!(slice, &[1, 2, 3]);

slice[0] = 4;
slice[1] = 2;
slice[2] = 1;

let expected: Int32Array = vec![Some(4), Some(2), Some(1)].into_iter().collect();

let new_array = builder.finish();
assert_eq!(expected, new_array);
}

#[test]
fn test_into_builder_cloned_array() {
let array: Int32Array = vec![1, 2, 3].into_iter().map(Some).collect();

let boxed: ArrayRef = Arc::new(array);

let col: Int32Array = PrimitiveArray::<Int32Type>::from(boxed.data().clone());
let err = col.into_builder();

match err {
Ok(_) => panic!("Should not get builder from cloned array"),
Err(returned) => {
let expected: Int32Array = vec![1, 2, 3].into_iter().map(Some).collect();
assert_eq!(expected, returned)
}
}
}

#[test]
fn test_into_builder_on_sliced_array() {
let array: Int32Array = vec![1, 2, 3].into_iter().map(Some).collect();
let slice = array.slice(1, 2);
let col: Int32Array = downcast_array(&slice);

drop(slice);

col.into_builder()
.expect_err("Should not build builder from sliced array");
}

#[test]
fn test_unary_mut() {
let array: Int32Array = vec![1, 2, 3].into_iter().map(Some).collect();

let c = array.unary_mut(|x| x * 2 + 1).unwrap();
let expected: Int32Array = vec![3, 5, 7].into_iter().map(Some).collect();

assert_eq!(expected, c);

let array: Int32Array = Int32Array::from(vec![Some(5), Some(7), None]);
let c = array.unary_mut(|x| x * 2 + 1).unwrap();
assert_eq!(c, Int32Array::from(vec![Some(11), Some(15), None]));
}
}
5 changes: 5 additions & 0 deletions arrow-array/src/builder/boolean_buffer_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@ impl BooleanBufferBuilder {
Self { buffer, len: 0 }
}

pub fn new_from_buffer(buffer: MutableBuffer, len: usize) -> Self {
assert!(len <= buffer.len() * 8);
Self { buffer, len }
}

#[inline]
pub fn len(&self) -> usize {
self.len
Expand Down
9 changes: 9 additions & 0 deletions arrow-array/src/builder/buffer_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,15 @@ impl<T: ArrowNativeType> BufferBuilder<T> {
}
}

pub fn new_from_buffer(buffer: MutableBuffer) -> Self {
let buffer_len = buffer.len();
Self {
buffer,
len: buffer_len / std::mem::size_of::<T>(),
_marker: PhantomData,
}
}

/// Returns the current number of array elements in the internal buffer.
///
/// # Example:
Expand Down
25 changes: 24 additions & 1 deletion arrow-array/src/builder/null_buffer_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
// under the License.

use crate::builder::BooleanBufferBuilder;
use arrow_buffer::Buffer;
use arrow_buffer::{Buffer, MutableBuffer};

/// Builder for creating the null bit buffer.
/// This builder only materializes the buffer when we append `false`.
Expand All @@ -42,6 +42,29 @@ impl NullBufferBuilder {
}
}

/// Creates a new builder with given length.
pub fn new_with_len(len: usize) -> Self {
Self {
bitmap_builder: None,
len,
capacity: len,
}
}

/// Creates a new builder from a `MutableBuffer`.
pub fn new_from_buffer(buffer: MutableBuffer, len: usize) -> Self {
let capacity = buffer.len() * 8;

assert!(len < capacity);

let bitmap_builder = Some(BooleanBufferBuilder::new_from_buffer(buffer, len));
Self {
bitmap_builder,
len,
capacity,
}
}

/// Appends `n` `true`s into the builder
/// to indicate that these `n` items are not nulls.
#[inline]
Expand Down
24 changes: 24 additions & 0 deletions arrow-array/src/builder/primitive_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use crate::builder::null_buffer_builder::NullBufferBuilder;
use crate::builder::{ArrayBuilder, BufferBuilder};
use crate::types::*;
use crate::{ArrayRef, ArrowPrimitiveType, PrimitiveArray};
use arrow_buffer::MutableBuffer;
use arrow_data::ArrayData;
use std::any::Any;
use std::sync::Arc;
Expand Down Expand Up @@ -114,6 +115,24 @@ impl<T: ArrowPrimitiveType> PrimitiveBuilder<T> {
}
}

pub fn new_from_buffer(
values_buffer: MutableBuffer,
null_buffer: Option<MutableBuffer>,
) -> Self {
let values_builder = BufferBuilder::<T::Native>::new_from_buffer(values_buffer);

let null_buffer_builder = null_buffer
.map(|buffer| {
NullBufferBuilder::new_from_buffer(buffer, values_builder.len())
})
.unwrap_or_else(|| NullBufferBuilder::new_with_len(values_builder.len()));

Self {
values_builder,
null_buffer_builder,
}
}

/// Returns the capacity of this builder measured in slots of type `T`
pub fn capacity(&self) -> usize {
self.values_builder.capacity()
Expand Down Expand Up @@ -204,6 +223,11 @@ impl<T: ArrowPrimitiveType> PrimitiveBuilder<T> {
pub fn values_slice(&self) -> &[T::Native] {
self.values_builder.as_slice()
}

/// Returns the current values buffer as a mutable slice
pub fn values_slice_mut(&mut self) -> &mut [T::Native] {
self.values_builder.as_slice_mut()
}
}

#[cfg(test)]
Expand Down
19 changes: 19 additions & 0 deletions arrow-buffer/src/buffer/immutable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,25 @@ impl Buffer {
pub fn count_set_bits_offset(&self, offset: usize, len: usize) -> usize {
UnalignedBitChunk::new(self.as_slice(), offset, len).count_ones()
}

/// Returns `MutableBuffer` for mutating the buffer if this buffer is not shared.
/// Returns `Err` if this is shared or its allocation is from an external source.
pub fn into_mutable(self) -> Result<MutableBuffer, Self> {
let offset_ptr = self.as_ptr();
let offset = self.offset;
let length = self.length;
Arc::try_unwrap(self.data)
.and_then(|bytes| {
// The pointer of underlying buffer should not be offset.
assert_eq!(offset_ptr, bytes.ptr().as_ptr());
MutableBuffer::from_bytes(bytes).map_err(Arc::new)
})
.map_err(|bytes| Buffer {
data: bytes,
offset,
length,
})
}
}

/// Creating a `Buffer` instance by copying the memory from a `AsRef<[u8]>` into a newly
Expand Down
19 changes: 19 additions & 0 deletions arrow-buffer/src/buffer/mutable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use crate::{
native::{ArrowNativeType, ToByteSlice},
util::bit_util,
};
use std::mem;
use std::ptr::NonNull;

/// A [`MutableBuffer`] is Arrow's interface to build a [`Buffer`] out of items or slices of items.
Expand Down Expand Up @@ -92,6 +93,24 @@ impl MutableBuffer {
}
}

/// Allocates a new [MutableBuffer] from given `Bytes`.
pub(crate) fn from_bytes(bytes: Bytes) -> Result<Self, Bytes> {
if !matches!(bytes.deallocation(), Deallocation::Arrow(_)) {
return Err(bytes);
}

let len = bytes.len();
let capacity = bytes.capacity();
let ptr = bytes.ptr();
mem::forget(bytes);

Ok(Self {
data: ptr,
len,
capacity,
})
}

/// creates a new [MutableBuffer] with capacity and length capable of holding `len` bits.
/// This is useful to create a buffer for packed bitmaps.
pub fn new_null(len: usize) -> Self {
Expand Down
5 changes: 5 additions & 0 deletions arrow-buffer/src/bytes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,11 @@ impl Bytes {
Deallocation::Custom(_) => 0,
}
}

#[inline]
pub(crate) fn deallocation(&self) -> &Deallocation {
&self.deallocation
}
}

// Deallocation is Send + Sync, repeating the bound here makes that refactoring safe
Expand Down

0 comments on commit 5bce104

Please sign in to comment.