Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

track caller in tokio-util #4445

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions tokio-util/src/io/sync_bridge.rs
Expand Up @@ -88,6 +88,7 @@ impl<T: Unpin> SyncIoBridge<T> {
/// # Panic
///
/// This will panic if called outside the context of a Tokio runtime.
#[track_caller]
pub fn new(src: T) -> Self {
Self::new_with_handle(src, tokio::runtime::Handle::current())
}
Expand Down
1 change: 1 addition & 0 deletions tokio-util/src/lib.rs
Expand Up @@ -104,6 +104,7 @@ mod util {
/// # }
/// ```
#[cfg_attr(not(feature = "io"), allow(unreachable_pub))]
#[track_caller]
pub fn poll_read_buf<T: AsyncRead, B: BufMut>(
io: Pin<&mut T>,
cx: &mut Context<'_>,
Expand Down
11 changes: 7 additions & 4 deletions tokio-util/src/sync/mpsc.rs
Expand Up @@ -46,14 +46,16 @@ impl<T: Send + 'static> PollSender<T> {

/// Start sending a new item.
///
/// This method panics if a send is currently in progress. To ensure that no
/// send is in progress, call `poll_send_done` first until it returns
/// `Poll::Ready`.
///
/// If this method returns an error, that indicates that the channel is
/// closed. Note that this method is not guaranteed to return an error if
/// the channel is closed, but in that case the error would be reported by
/// the first call to `poll_send_done`.
///
/// # Panics
/// This method panics if a send is currently in progress. To ensure that no
/// send is in progress, call `poll_send_done` first until it returns
/// `Poll::Ready`.
#[track_caller]
pub fn start_send(&mut self, value: T) -> Result<(), SendError<T>> {
if self.is_sending {
panic!("start_send called while not ready.");
Expand Down Expand Up @@ -199,6 +201,7 @@ impl<T: Send + 'static> Sink<T> for PollSender<T> {
/// This is equivalent to calling [`start_send`].
///
/// [`start_send`]: PollSender::start_send
#[track_caller]
fn start_send(self: Pin<&mut Self>, item: T) -> Result<(), Self::Error> {
Pin::into_inner(self).start_send(item)
}
Expand Down
1 change: 1 addition & 0 deletions tokio-util/src/task/spawn_pinned.rs
Expand Up @@ -21,6 +21,7 @@ impl LocalPoolHandle {
///
/// # Panics
/// Panics if the pool size is less than one.
#[track_caller]
pub fn new(pool_size: usize) -> LocalPoolHandle {
assert!(pool_size > 0);

Expand Down
13 changes: 13 additions & 0 deletions tokio-util/src/time/delay_queue.rs
Expand Up @@ -187,6 +187,7 @@ impl<T> SlabStorage<T> {
}

// Inserts data into the inner slab and re-maps keys if necessary
#[track_caller]
pub(crate) fn insert(&mut self, val: Data<T>) -> Key {
let mut key = KeyInternal::new(self.inner.insert(val));
let key_contained = self.key_map.contains_key(&key.into());
Expand Down Expand Up @@ -305,6 +306,7 @@ impl<T> SlabStorage<T> {
self.compact_called = false;
}

#[track_caller]
pub(crate) fn reserve(&mut self, additional: usize) {
self.inner.reserve(additional);

Expand Down Expand Up @@ -346,6 +348,7 @@ where
impl<T> Index<Key> for SlabStorage<T> {
type Output = Data<T>;

#[track_caller]
fn index(&self, key: Key) -> &Self::Output {
let remapped_key = self.remap_key(&key);

Expand All @@ -357,6 +360,7 @@ impl<T> Index<Key> for SlabStorage<T> {
}

impl<T> IndexMut<Key> for SlabStorage<T> {
#[track_caller]
fn index_mut(&mut self, key: Key) -> &mut Data<T> {
let remapped_key = self.remap_key(&key);

Expand Down Expand Up @@ -533,6 +537,7 @@ impl<T> DelayQueue<T> {
/// [`reset`]: method@Self::reset
/// [`Key`]: struct@Key
/// [type]: #
#[track_caller]
pub fn insert_at(&mut self, value: T, when: Instant) -> Key {
assert!(self.slab.len() < MAX_ENTRIES, "max entries exceeded");

Expand Down Expand Up @@ -656,10 +661,12 @@ impl<T> DelayQueue<T> {
/// [`reset`]: method@Self::reset
/// [`Key`]: struct@Key
/// [type]: #
#[track_caller]
pub fn insert(&mut self, value: T, timeout: Duration) -> Key {
self.insert_at(value, Instant::now() + timeout)
}

#[track_caller]
fn insert_idx(&mut self, when: u64, key: Key) {
use self::wheel::{InsertError, Stack};

Expand All @@ -681,6 +688,7 @@ impl<T> DelayQueue<T> {
/// # Panics
///
/// Panics if the key is not contained in the expired queue or the wheel.
#[track_caller]
fn remove_key(&mut self, key: &Key) {
use crate::time::wheel::Stack;

Expand Down Expand Up @@ -720,6 +728,7 @@ impl<T> DelayQueue<T> {
/// assert_eq!(*item.get_ref(), "foo");
/// # }
/// ```
#[track_caller]
pub fn remove(&mut self, key: &Key) -> Expired<T> {
let prev_deadline = self.next_deadline();

Expand Down Expand Up @@ -776,6 +785,7 @@ impl<T> DelayQueue<T> {
/// // "foo" is now scheduled to be returned in 10 seconds
/// # }
/// ```
#[track_caller]
pub fn reset_at(&mut self, key: &Key, when: Instant) {
self.remove_key(key);

Expand Down Expand Up @@ -880,6 +890,7 @@ impl<T> DelayQueue<T> {
/// // "foo"is now scheduled to be returned in 10 seconds
/// # }
/// ```
#[track_caller]
pub fn reset(&mut self, key: &Key, timeout: Duration) {
self.reset_at(key, Instant::now() + timeout);
}
Expand Down Expand Up @@ -985,6 +996,7 @@ impl<T> DelayQueue<T> {
/// assert!(delay_queue.capacity() >= 11);
/// # }
/// ```
#[track_caller]
pub fn reserve(&mut self, additional: usize) {
self.slab.reserve(additional);
}
Expand Down Expand Up @@ -1124,6 +1136,7 @@ impl<T> wheel::Stack for Stack<T> {
}
}

#[track_caller]
fn remove(&mut self, item: &Self::Borrowed, store: &mut Self::Store) {
let key = *item;
assert!(store.contains(item));
Expand Down
1 change: 1 addition & 0 deletions tokio-util/src/time/wheel/mod.rs
Expand Up @@ -118,6 +118,7 @@ where
}

/// Remove `item` from the timing wheel.
#[track_caller]
pub(crate) fn remove(&mut self, item: &T::Borrowed, store: &mut T::Store) {
let when = T::when(item, store);

Expand Down