Skip to content

Commit

Permalink
Add zero-copy make_mut (tokio-rs#695)
Browse files Browse the repository at this point in the history
  • Loading branch information
Sytten committed May 5, 2024
1 parent 0c17e99 commit 86694b0
Show file tree
Hide file tree
Showing 4 changed files with 341 additions and 2 deletions.
150 changes: 149 additions & 1 deletion src/bytes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use crate::buf::IntoIter;
#[allow(unused)]
use crate::loom::sync::atomic::AtomicMut;
use crate::loom::sync::atomic::{AtomicPtr, AtomicUsize, Ordering};
use crate::Buf;
use crate::{Buf, BytesMut};

/// A cheaply cloneable and sliceable chunk of contiguous memory.
///
Expand Down Expand Up @@ -113,6 +113,7 @@ pub(crate) struct Vtable {
///
/// takes `Bytes` to value
pub to_vec: unsafe fn(&AtomicPtr<()>, *const u8, usize) -> Vec<u8>,
pub to_mut: unsafe fn(&AtomicPtr<()>, *const u8, usize) -> BytesMut,
/// fn(data)
pub is_unique: unsafe fn(&AtomicPtr<()>) -> bool,
/// fn(data, ptr, len)
Expand Down Expand Up @@ -507,6 +508,49 @@ impl Bytes {
self.truncate(0);
}

/// Try to convert self into `BytesMut`.
///
/// If `self` is unique for the entire original buffer, this will succeed
/// and return a `BytesMut` with the contents of `self` without copying.
/// If `self` is not unique for the entire original buffer, this will fail
/// and return self.
///
/// # Examples
///
/// ```
/// use bytes::{Bytes, BytesMut};
///
/// let bytes = Bytes::from(b"hello".to_vec());
/// assert_eq!(bytes.try_into_mut(), Ok(BytesMut::from(&b"hello"[..])));
/// ```
pub fn try_into_mut(self) -> Result<BytesMut, Bytes> {
if self.is_unique() {
Ok(self.make_mut())
} else {
Err(self)
}
}

/// Convert self into `BytesMut`.
///
/// If `self` is unique for the entire original buffer, this will return a
/// `BytesMut` with the contents of `self` without copying.
/// If `self` is not unique for the entire original buffer, this will make
/// a copy of `self` subset of the original buffer in a new `BytesMut`.
///
/// # Examples
///
/// ```
/// use bytes::{Bytes, BytesMut};
///
/// let bytes = Bytes::from(b"hello".to_vec());
/// assert_eq!(bytes.make_mut(), BytesMut::from(&b"hello"[..]));
/// ```
pub fn make_mut(self) -> BytesMut {
let bytes = ManuallyDrop::new(self);
unsafe { (bytes.vtable.to_mut)(&bytes.data, bytes.ptr, bytes.len) }
}

#[inline]
pub(crate) unsafe fn with_vtable(
ptr: *const u8,
Expand Down Expand Up @@ -917,6 +961,7 @@ impl fmt::Debug for Vtable {
const STATIC_VTABLE: Vtable = Vtable {
clone: static_clone,
to_vec: static_to_vec,
to_mut: static_to_mut,
is_unique: static_is_unique,
drop: static_drop,
};
Expand All @@ -931,6 +976,11 @@ unsafe fn static_to_vec(_: &AtomicPtr<()>, ptr: *const u8, len: usize) -> Vec<u8
slice.to_vec()
}

unsafe fn static_to_mut(_: &AtomicPtr<()>, ptr: *const u8, len: usize) -> BytesMut {
let slice = slice::from_raw_parts(ptr, len);
BytesMut::from(slice)
}

fn static_is_unique(_: &AtomicPtr<()>) -> bool {
false
}
Expand All @@ -944,13 +994,15 @@ unsafe fn static_drop(_: &mut AtomicPtr<()>, _: *const u8, _: usize) {
static PROMOTABLE_EVEN_VTABLE: Vtable = Vtable {
clone: promotable_even_clone,
to_vec: promotable_even_to_vec,
to_mut: promotable_even_to_mut,
is_unique: promotable_is_unique,
drop: promotable_even_drop,
};

static PROMOTABLE_ODD_VTABLE: Vtable = Vtable {
clone: promotable_odd_clone,
to_vec: promotable_odd_to_vec,
to_mut: promotable_odd_to_mut,
is_unique: promotable_is_unique,
drop: promotable_odd_drop,
};
Expand Down Expand Up @@ -994,12 +1046,47 @@ unsafe fn promotable_to_vec(
}
}

unsafe fn promotable_to_mut(
data: &AtomicPtr<()>,
ptr: *const u8,
len: usize,
f: fn(*mut ()) -> *mut u8,
) -> BytesMut {
let shared = data.load(Ordering::Acquire);
let kind = shared as usize & KIND_MASK;

if kind == KIND_ARC {
shared_to_mut_impl(shared.cast(), ptr, len)
} else {
// KIND_VEC is a view of an underlying buffer at a certain offset.
// The ptr + len always represents the end of that buffer.
// Before truncating it, it is first promoted to KIND_ARC.
// Thus, we can safely reconstruct a Vec from it without leaking memory.
debug_assert_eq!(kind, KIND_VEC);

let buf = f(shared);
let off = offset_from(ptr, buf);
let cap = off + len;
let v = Vec::from_raw_parts(buf, cap, cap);

let mut b = BytesMut::from_vec(v);
b.advance_unchecked(off);
b
}
}

unsafe fn promotable_even_to_vec(data: &AtomicPtr<()>, ptr: *const u8, len: usize) -> Vec<u8> {
promotable_to_vec(data, ptr, len, |shared| {
ptr_map(shared.cast(), |addr| addr & !KIND_MASK)
})
}

unsafe fn promotable_even_to_mut(data: &AtomicPtr<()>, ptr: *const u8, len: usize) -> BytesMut {
promotable_to_mut(data, ptr, len, |shared| {
ptr_map(shared.cast(), |addr| addr & !KIND_MASK)
})
}

unsafe fn promotable_even_drop(data: &mut AtomicPtr<()>, ptr: *const u8, len: usize) {
data.with_mut(|shared| {
let shared = *shared;
Expand Down Expand Up @@ -1031,6 +1118,10 @@ unsafe fn promotable_odd_to_vec(data: &AtomicPtr<()>, ptr: *const u8, len: usize
promotable_to_vec(data, ptr, len, |shared| shared.cast())
}

unsafe fn promotable_odd_to_mut(data: &AtomicPtr<()>, ptr: *const u8, len: usize) -> BytesMut {
promotable_to_mut(data, ptr, len, |shared| shared.cast())
}

unsafe fn promotable_odd_drop(data: &mut AtomicPtr<()>, ptr: *const u8, len: usize) {
data.with_mut(|shared| {
let shared = *shared;
Expand Down Expand Up @@ -1087,6 +1178,7 @@ const _: [(); 0 - mem::align_of::<Shared>() % 2] = []; // Assert that the alignm
static SHARED_VTABLE: Vtable = Vtable {
clone: shared_clone,
to_vec: shared_to_vec,
to_mut: shared_to_mut,
is_unique: shared_is_unique,
drop: shared_drop,
};
Expand Down Expand Up @@ -1133,6 +1225,45 @@ unsafe fn shared_to_vec(data: &AtomicPtr<()>, ptr: *const u8, len: usize) -> Vec
shared_to_vec_impl(data.load(Ordering::Relaxed).cast(), ptr, len)
}

unsafe fn shared_to_mut_impl(shared: *mut Shared, ptr: *const u8, len: usize) -> BytesMut {
// The goal is to check if the current handle is the only handle
// that currently has access to the buffer. This is done by
// checking if the `ref_cnt` is currently 1.
//
// The `Acquire` ordering synchronizes with the `Release` as
// part of the `fetch_sub` in `release_shared`. The `fetch_sub`
// operation guarantees that any mutations done in other threads
// are ordered before the `ref_cnt` is decremented. As such,
// this `Acquire` will guarantee that those mutations are
// visible to the current thread.
//
// Otherwise, we take the other branch, copy the data and call `release_shared`.
if (*shared).ref_cnt.load(Ordering::Acquire) == 1 {
// Deallocate the `Shared` instance without running its destructor.
let shared = *Box::from_raw(shared);
let shared = ManuallyDrop::new(shared);
let buf = shared.buf;
let cap = shared.cap;

// Rebuild Vec
let off = offset_from(ptr, buf);
let v = Vec::from_raw_parts(buf, len + off, cap);

let mut b = BytesMut::from_vec(v);
b.advance_unchecked(off);
b
} else {
// Copy the data from Shared in a new Vec, then release it
let v = slice::from_raw_parts(ptr, len).to_vec();
release_shared(shared);
BytesMut::from_vec(v)
}
}

unsafe fn shared_to_mut(data: &AtomicPtr<()>, ptr: *const u8, len: usize) -> BytesMut {
shared_to_mut_impl(data.load(Ordering::Relaxed).cast(), ptr, len)
}

pub(crate) unsafe fn shared_is_unique(data: &AtomicPtr<()>) -> bool {
let shared = data.load(Ordering::Acquire);
let ref_cnt = (*shared.cast::<Shared>()).ref_cnt.load(Ordering::Relaxed);
Expand Down Expand Up @@ -1291,6 +1422,23 @@ where
new_addr as *mut u8
}

/// Precondition: dst >= original
///
/// The following line is equivalent to:
///
/// ```rust,ignore
/// self.ptr.as_ptr().offset_from(ptr) as usize;
/// ```
///
/// But due to min rust is 1.39 and it is only stabilized
/// in 1.47, we cannot use it.
#[inline]
fn offset_from(dst: *const u8, original: *const u8) -> usize {
debug_assert!(dst >= original);

dst as usize - original as usize
}

// compile-fails

/// ```compile_fail
Expand Down
32 changes: 31 additions & 1 deletion src/bytes_mut.rs
Original file line number Diff line number Diff line change
Expand Up @@ -868,7 +868,7 @@ impl BytesMut {
/// # SAFETY
///
/// The caller must ensure that `count` <= `self.cap`.
unsafe fn advance_unchecked(&mut self, count: usize) {
pub(crate) unsafe fn advance_unchecked(&mut self, count: usize) {
// Setting the start to 0 is a no-op, so return early if this is the
// case.
if count == 0 {
Expand Down Expand Up @@ -1713,6 +1713,7 @@ unsafe fn rebuild_vec(ptr: *mut u8, mut len: usize, mut cap: usize, off: usize)
static SHARED_VTABLE: Vtable = Vtable {
clone: shared_v_clone,
to_vec: shared_v_to_vec,
to_mut: shared_v_to_mut,
is_unique: crate::bytes::shared_is_unique,
drop: shared_v_drop,
};
Expand Down Expand Up @@ -1747,6 +1748,35 @@ unsafe fn shared_v_to_vec(data: &AtomicPtr<()>, ptr: *const u8, len: usize) -> V
}
}

unsafe fn shared_v_to_mut(data: &AtomicPtr<()>, ptr: *const u8, len: usize) -> BytesMut {
let shared: *mut Shared = data.load(Ordering::Relaxed).cast();

if (*shared).is_unique() {
let shared = &mut *shared;

// The capacity is always the original capacity of the buffer
// minus the offset from the start of the buffer
let v = &mut shared.vec;
let v_capacity = v.capacity();
let v_ptr = v.as_mut_ptr();
let offset = offset_from(ptr as *mut u8, v_ptr);
let cap = v_capacity - offset;

let ptr = vptr(ptr as *mut u8);

BytesMut {
ptr,
len,
cap,
data: shared,
}
} else {
let v = slice::from_raw_parts(ptr, len).to_vec();
release_shared(shared);
BytesMut::from_vec(v)
}
}

unsafe fn shared_v_drop(data: &mut AtomicPtr<()>, _ptr: *const u8, _len: usize) {
data.with_mut(|shared| {
release_shared(*shared as *mut Shared);
Expand Down

0 comments on commit 86694b0

Please sign in to comment.