Skip to content

Commit

Permalink
feat: Add zero-copy make_mut
Browse files Browse the repository at this point in the history
  • Loading branch information
Sytten committed Apr 14, 2024
1 parent 327615e commit 6ff8eb4
Show file tree
Hide file tree
Showing 3 changed files with 261 additions and 6 deletions.
139 changes: 138 additions & 1 deletion src/bytes.rs
Expand Up @@ -15,7 +15,7 @@ use crate::buf::IntoIter;
#[allow(unused)]
use crate::loom::sync::atomic::AtomicMut;
use crate::loom::sync::atomic::{AtomicPtr, AtomicUsize, Ordering};
use crate::Buf;
use crate::{Buf, BytesMut};

/// A cheaply cloneable and sliceable chunk of contiguous memory.
///
Expand Down Expand Up @@ -113,6 +113,7 @@ pub(crate) struct Vtable {
///
/// takes `Bytes` to value
pub to_vec: unsafe fn(&AtomicPtr<()>, *const u8, usize) -> Vec<u8>,
pub to_mut: unsafe fn(&AtomicPtr<()>, *const u8, usize) -> BytesMut,
/// fn(data)
pub is_unique: unsafe fn(&AtomicPtr<()>) -> bool,
/// fn(data, ptr, len)
Expand Down Expand Up @@ -507,6 +508,46 @@ impl Bytes {
self.truncate(0);
}

/// Try to convert self into `BytesMut`.
///
/// If `self` is unique, this will succeed and return a `BytesMut` with
/// the contents of `self` without copying. If `self` is not unique, this
/// will fail and return self.
///
/// # Examples
///
/// ```
/// use bytes::{Bytes, BytesMut};
///
/// let bytes = Bytes::from(b"hello".to_vec());
/// assert_eq!(bytes.try_into_mut(), Ok(BytesMut::from(&b"hello"[..])));
/// ```
pub fn try_into_mut(self) -> Result<BytesMut, Bytes> {
if self.is_unique() {
Ok(self.make_mut())
} else {
Err(self)
}
}

/// Convert self into `BytesMut`.
///
/// If `self` is unique, it will `BytesMut` with the contents of `self` without copying.
/// If `self` is not unique, it will make a copy of the data.
///
/// # Examples
///
/// ```
/// use bytes::{Bytes, BytesMut};
///
/// let bytes = Bytes::from(b"hello".to_vec());
/// assert_eq!(bytes.make_mut(), BytesMut::from(&b"hello"[..]));
/// ```
pub fn make_mut(self) -> BytesMut {
let bytes = ManuallyDrop::new(self);
unsafe { (bytes.vtable.to_mut)(&bytes.data, bytes.ptr, bytes.len) }
}

#[inline]
pub(crate) unsafe fn with_vtable(
ptr: *const u8,
Expand Down Expand Up @@ -917,6 +958,7 @@ impl fmt::Debug for Vtable {
const STATIC_VTABLE: Vtable = Vtable {
clone: static_clone,
to_vec: static_to_vec,
to_mut: static_to_mut,
is_unique: static_is_unique,
drop: static_drop,
};
Expand All @@ -931,6 +973,11 @@ unsafe fn static_to_vec(_: &AtomicPtr<()>, ptr: *const u8, len: usize) -> Vec<u8
slice.to_vec()
}

unsafe fn static_to_mut(_: &AtomicPtr<()>, ptr: *const u8, len: usize) -> BytesMut {
let slice = slice::from_raw_parts(ptr, len);
BytesMut::from(slice)
}

fn static_is_unique(_: &AtomicPtr<()>) -> bool {
false
}
Expand All @@ -944,13 +991,15 @@ unsafe fn static_drop(_: &mut AtomicPtr<()>, _: *const u8, _: usize) {
static PROMOTABLE_EVEN_VTABLE: Vtable = Vtable {
clone: promotable_even_clone,
to_vec: promotable_even_to_vec,
to_mut: promotable_even_to_mut,
is_unique: promotable_is_unique,
drop: promotable_even_drop,
};

static PROMOTABLE_ODD_VTABLE: Vtable = Vtable {
clone: promotable_odd_clone,
to_vec: promotable_odd_to_vec,
to_mut: promotable_odd_to_mut,
is_unique: promotable_is_unique,
drop: promotable_odd_drop,
};
Expand Down Expand Up @@ -994,12 +1043,41 @@ unsafe fn promotable_to_vec(
}
}

unsafe fn promotable_to_mut(
data: &AtomicPtr<()>,
ptr: *const u8,
len: usize,
f: fn(*mut ()) -> *mut u8,
) -> BytesMut {
let shared = data.load(Ordering::Acquire);
let kind = shared as usize & KIND_MASK;

if kind == KIND_ARC {
shared_to_mut_impl(shared.cast(), ptr, len)
} else {
debug_assert_eq!(kind, KIND_VEC);

let buf = f(shared);
let off = offset_from(ptr, buf);
let cap = off + len;
let v = Vec::from_raw_parts(buf, cap, cap);

BytesMut::from_vec_offset(v, off)
}
}

unsafe fn promotable_even_to_vec(data: &AtomicPtr<()>, ptr: *const u8, len: usize) -> Vec<u8> {
promotable_to_vec(data, ptr, len, |shared| {
ptr_map(shared.cast(), |addr| addr & !KIND_MASK)
})
}

unsafe fn promotable_even_to_mut(data: &AtomicPtr<()>, ptr: *const u8, len: usize) -> BytesMut {
promotable_to_mut(data, ptr, len, |shared| {
ptr_map(shared.cast(), |addr| addr & !KIND_MASK)
})
}

unsafe fn promotable_even_drop(data: &mut AtomicPtr<()>, ptr: *const u8, len: usize) {
data.with_mut(|shared| {
let shared = *shared;
Expand Down Expand Up @@ -1031,6 +1109,10 @@ unsafe fn promotable_odd_to_vec(data: &AtomicPtr<()>, ptr: *const u8, len: usize
promotable_to_vec(data, ptr, len, |shared| shared.cast())
}

unsafe fn promotable_odd_to_mut(data: &AtomicPtr<()>, ptr: *const u8, len: usize) -> BytesMut {
promotable_to_mut(data, ptr, len, |shared| shared.cast())
}

unsafe fn promotable_odd_drop(data: &mut AtomicPtr<()>, ptr: *const u8, len: usize) {
data.with_mut(|shared| {
let shared = *shared;
Expand Down Expand Up @@ -1087,6 +1169,7 @@ const _: [(); 0 - mem::align_of::<Shared>() % 2] = []; // Assert that the alignm
static SHARED_VTABLE: Vtable = Vtable {
clone: shared_clone,
to_vec: shared_to_vec,
to_mut: shared_to_mut,
is_unique: shared_is_unique,
drop: shared_drop,
};
Expand Down Expand Up @@ -1133,6 +1216,43 @@ unsafe fn shared_to_vec(data: &AtomicPtr<()>, ptr: *const u8, len: usize) -> Vec
shared_to_vec_impl(data.load(Ordering::Relaxed).cast(), ptr, len)
}

unsafe fn shared_to_mut_impl(shared: *mut Shared, ptr: *const u8, len: usize) -> BytesMut {
// The goal is to check if the current handle is the only handle
// that currently has access to the buffer. This is done by
// checking if the `ref_cnt` is currently 1.
//
// The `Acquire` ordering synchronizes with the `Release` as
// part of the `fetch_sub` in `release_shared`. The `fetch_sub`
// operation guarantees that any mutations done in other threads
// are ordered before the `ref_cnt` is decremented. As such,
// this `Acquire` will guarantee that those mutations are
// visible to the current thread.
//
// Otherwise, we take the other branch, copy the data and call `release_shared``.
if (*shared).ref_cnt.load(Ordering::Acquire) == 1 {
// Deallocate the `Shared` instance without running its destructor.
let shared = *Box::from_raw(shared);
let shared = ManuallyDrop::new(shared);
let buf = shared.buf;
let cap = shared.cap;

// Rebuild Vec
let off = offset_from(ptr, buf);
let len = len + off;
let v = Vec::from_raw_parts(buf, len, cap);

BytesMut::from_vec_offset(v, off)
} else {
let v = slice::from_raw_parts(ptr, len).to_vec();
release_shared(shared);
BytesMut::from_vec(v)
}
}

unsafe fn shared_to_mut(data: &AtomicPtr<()>, ptr: *const u8, len: usize) -> BytesMut {
shared_to_mut_impl(data.load(Ordering::Relaxed).cast(), ptr, len)
}

pub(crate) unsafe fn shared_is_unique(data: &AtomicPtr<()>) -> bool {
let shared = data.load(Ordering::Acquire);
let ref_cnt = (*shared.cast::<Shared>()).ref_cnt.load(Ordering::Relaxed);
Expand Down Expand Up @@ -1291,6 +1411,23 @@ where
new_addr as *mut u8
}

/// Precondition: dst >= original
///
/// The following line is equivalent to:
///
/// ```rust,ignore
/// self.ptr.as_ptr().offset_from(ptr) as usize;
/// ```
///
/// But due to min rust is 1.39 and it is only stabilized
/// in 1.47, we cannot use it.
#[inline]
fn offset_from(dst: *const u8, original: *const u8) -> usize {
debug_assert!(dst >= original);

dst as usize - original as usize
}

// compile-fails

/// ```compile_fail
Expand Down
40 changes: 35 additions & 5 deletions src/bytes_mut.rs
Expand Up @@ -829,13 +829,20 @@ impl BytesMut {
// suddenly a lot more expensive.
#[inline]
pub(crate) fn from_vec(vec: Vec<u8>) -> BytesMut {
unsafe { BytesMut::from_vec_offset(vec, 0) }
}

#[inline]
pub(crate) unsafe fn from_vec_offset(vec: Vec<u8>, off: usize) -> BytesMut {
let mut vec = ManuallyDrop::new(vec);
let ptr = vptr(vec.as_mut_ptr());
let len = vec.len();
let cap = vec.capacity();
let ptr = vptr(vec.as_mut_ptr().add(off));
let len = vec.len().checked_sub(off).unwrap_or(0);
let cap = vec.capacity() - off;

let original_capacity_repr = original_capacity_to_repr(cap);
let data = (original_capacity_repr << ORIGINAL_CAPACITY_OFFSET) | KIND_VEC;
let original_capacity_repr = original_capacity_to_repr(vec.capacity());
let data = (original_capacity_repr << ORIGINAL_CAPACITY_OFFSET)
| (off << VEC_POS_OFFSET)
| KIND_VEC;

BytesMut {
ptr,
Expand Down Expand Up @@ -1698,6 +1705,7 @@ unsafe fn rebuild_vec(ptr: *mut u8, mut len: usize, mut cap: usize, off: usize)
static SHARED_VTABLE: Vtable = Vtable {
clone: shared_v_clone,
to_vec: shared_v_to_vec,
to_mut: shared_v_to_mut,
is_unique: crate::bytes::shared_is_unique,
drop: shared_v_drop,
};
Expand Down Expand Up @@ -1732,6 +1740,28 @@ unsafe fn shared_v_to_vec(data: &AtomicPtr<()>, ptr: *const u8, len: usize) -> V
}
}

unsafe fn shared_v_to_mut(data: &AtomicPtr<()>, ptr: *const u8, len: usize) -> BytesMut {
let shared: *mut Shared = data.load(Ordering::Relaxed).cast();

if (*shared).is_unique() {
let shared = &mut *shared;

let ptr = vptr(ptr as *mut u8);
let cap = len; // It will try to reclaim the buffer on first insert

BytesMut {
ptr,
len,
cap,
data: shared,
}
} else {
let v = slice::from_raw_parts(ptr, len).to_vec();
release_shared(shared);
BytesMut::from_vec(v)
}
}

unsafe fn shared_v_drop(data: &mut AtomicPtr<()>, _ptr: *const u8, _len: usize) {
data.with_mut(|shared| {
release_shared(*shared as *mut Shared);
Expand Down
88 changes: 88 additions & 0 deletions tests/test_bytes.rs
Expand Up @@ -1172,3 +1172,91 @@ fn shared_is_unique() {
drop(b);
assert!(c.is_unique());
}

#[test]
fn test_bytes_make_mut() {
// Test STATIC_VTABLE.to_mut
let bs = b"1b23exfcz3r";
let bs_long = b"1b23exfcz3r1b23exfcz3r";
let bytes_mut = Bytes::from_static(bs).make_mut();
assert_eq!(bytes_mut, bs[..]);

// Test bytes_mut.SHARED_VTABLE.to_mut impl
eprintln!("1");
let mut bytes_mut: BytesMut = bs[..].into();
bytes_mut = bytes_mut.freeze().make_mut();
assert_eq!(bytes_mut, bs[..]);
bytes_mut.extend_from_slice(&bs[..]);
assert_eq!(bytes_mut, bs_long[..]);

// Set kind to KIND_ARC so that after freeze, Bytes will use bytes_mut.SHARED_VTABLE
let mut bytes_mut: BytesMut = bs[..].into();
eprintln!("2");
drop(bytes_mut.split_off(bs.len()));

eprintln!("3");
let b1 = bytes_mut.freeze();
eprintln!("4");
let b2 = b1.clone();

eprintln!("{:#?}", (&*b1).as_ptr());

// shared.is_unique() = False
eprintln!("5");
let mut b1m = b1.make_mut();
assert_eq!(b1m, bs[..]);
b1m[0] = b'9';

// shared.is_unique() = True
eprintln!("6");
let b2m = b2.make_mut();
assert_eq!(b2m, bs[..]);

// Test bytes_mut.SHARED_VTABLE.to_mut impl where offset != 0
let mut bytes_mut1: BytesMut = bs[..].into();
let bytes_mut2 = bytes_mut1.split_off(9);

let b1 = bytes_mut1.freeze();
let b2 = bytes_mut2.freeze();

let b1m = b1.make_mut();
let b2m = b2.make_mut();

assert_eq!(b2m, bs[9..]);
assert_eq!(b1m, bs[..9]);
}

#[test]
fn test_bytes_make_mut_promotable_even() {
let vec = vec![33u8; 1024];

// Test cases where kind == KIND_VEC
let b1 = Bytes::from(vec.clone());
let b1m = b1.make_mut();
assert_eq!(b1m, vec);

// Test cases where kind == KIND_ARC, ref_cnt == 1
let b1 = Bytes::from(vec.clone());
drop(b1.clone());
let b1m = b1.make_mut();
assert_eq!(b1m, vec);

// Test cases where kind == KIND_ARC, ref_cnt == 2
let b1 = Bytes::from(vec.clone());
let b2 = b1.clone();
let b1m = b1.make_mut();
assert_eq!(b1m, vec);

// Test cases where vtable = SHARED_VTABLE, kind == KIND_ARC, ref_cnt == 1
let b2m = b2.make_mut();
assert_eq!(b2m, vec);

// Test cases where offset != 0
let mut b1 = Bytes::from(vec.clone());
let b2 = b1.split_off(20);
let b1m = b1.make_mut();
let b2m = b2.make_mut();

assert_eq!(b2m, vec[20..]);
assert_eq!(b1m, vec[..20]);
}

0 comments on commit 6ff8eb4

Please sign in to comment.