Skip to content

Commit

Permalink
fs: use chunks in fs::read_dir (#5309)
Browse files Browse the repository at this point in the history
  • Loading branch information
icedrocket committed Dec 28, 2022
1 parent 9af2f5e commit 4a4f80c
Showing 1 changed file with 73 additions and 18 deletions.
91 changes: 73 additions & 18 deletions tokio/src/fs/read_dir.rs
@@ -1,5 +1,6 @@
use crate::fs::asyncify;

use std::collections::VecDeque;
use std::ffi::OsString;
use std::fs::{FileType, Metadata};
use std::future::Future;
Expand All @@ -19,6 +20,8 @@ use crate::blocking::spawn_blocking;
#[cfg(not(test))]
use crate::blocking::JoinHandle;

const CHUNK_SIZE: usize = 32;

/// Returns a stream over the entries within a directory.
///
/// This is an async version of [`std::fs::read_dir`](std::fs::read_dir)
Expand All @@ -29,9 +32,14 @@ use crate::blocking::JoinHandle;
/// [`spawn_blocking`]: crate::task::spawn_blocking
pub async fn read_dir(path: impl AsRef<Path>) -> io::Result<ReadDir> {
let path = path.as_ref().to_owned();
let std = asyncify(|| std::fs::read_dir(path)).await?;
asyncify(|| -> io::Result<ReadDir> {
let mut std = std::fs::read_dir(path)?;
let mut buf = VecDeque::with_capacity(CHUNK_SIZE);
ReadDir::next_chunk(&mut buf, &mut std);

Ok(ReadDir(State::Idle(Some(std))))
Ok(ReadDir(State::Idle(Some((buf, std)))))
})
.await
}

/// Reads the entries in a directory.
Expand All @@ -58,8 +66,8 @@ pub struct ReadDir(State);

#[derive(Debug)]
enum State {
Idle(Option<std::fs::ReadDir>),
Pending(JoinHandle<(Option<io::Result<std::fs::DirEntry>>, std::fs::ReadDir)>),
Idle(Option<(VecDeque<io::Result<DirEntry>>, std::fs::ReadDir)>),
Pending(JoinHandle<(VecDeque<io::Result<DirEntry>>, std::fs::ReadDir)>),
}

impl ReadDir {
Expand Down Expand Up @@ -94,29 +102,57 @@ impl ReadDir {
pub fn poll_next_entry(&mut self, cx: &mut Context<'_>) -> Poll<io::Result<Option<DirEntry>>> {
loop {
match self.0 {
State::Idle(ref mut std) => {
let mut std = std.take().unwrap();
State::Idle(ref mut data) => {
let (buf, _) = data.as_mut().unwrap();

if let Some(ent) = buf.pop_front() {
return Poll::Ready(ent.map(Some));
};

let (mut buf, mut std) = data.take().unwrap();

self.0 = State::Pending(spawn_blocking(move || {
let ret = std.next();
(ret, std)
ReadDir::next_chunk(&mut buf, &mut std);
(buf, std)
}));
}
State::Pending(ref mut rx) => {
let (ret, std) = ready!(Pin::new(rx).poll(cx))?;
self.0 = State::Idle(Some(std));
let (mut buf, std) = ready!(Pin::new(rx).poll(cx))?;

let ret = match ret {
Some(Ok(std)) => Ok(Some(DirEntry(Arc::new(std)))),
let ret = match buf.pop_front() {
Some(Ok(x)) => Ok(Some(x)),
Some(Err(e)) => Err(e),
None => Ok(None),
};

self.0 = State::Idle(Some((buf, std)));

return Poll::Ready(ret);
}
}
}
}

fn next_chunk(buf: &mut VecDeque<io::Result<DirEntry>>, std: &mut std::fs::ReadDir) {
for ret in std.by_ref().take(CHUNK_SIZE) {
let success = ret.is_ok();

buf.push_back(ret.map(|std| DirEntry {
#[cfg(not(any(
target_os = "solaris",
target_os = "illumos",
target_os = "haiku",
target_os = "vxworks"
)))]
file_type: std.file_type().ok(),
std: Arc::new(std),
}));

if !success {
break;
}
}
}
}

feature! {
Expand Down Expand Up @@ -160,7 +196,16 @@ feature! {
/// filesystem. Each entry can be inspected via methods to learn about the full
/// path or possibly other metadata through per-platform extension traits.
#[derive(Debug)]
pub struct DirEntry(Arc<std::fs::DirEntry>);
pub struct DirEntry {
#[cfg(not(any(
target_os = "solaris",
target_os = "illumos",
target_os = "haiku",
target_os = "vxworks"
)))]
file_type: Option<FileType>,
std: Arc<std::fs::DirEntry>,
}

impl DirEntry {
/// Returns the full path to the file that this entry represents.
Expand Down Expand Up @@ -193,7 +238,7 @@ impl DirEntry {
///
/// The exact text, of course, depends on what files you have in `.`.
pub fn path(&self) -> PathBuf {
self.0.path()
self.std.path()
}

/// Returns the bare file name of this directory entry without any other
Expand All @@ -214,7 +259,7 @@ impl DirEntry {
/// # }
/// ```
pub fn file_name(&self) -> OsString {
self.0.file_name()
self.std.file_name()
}

/// Returns the metadata for the file that this entry points at.
Expand Down Expand Up @@ -248,7 +293,7 @@ impl DirEntry {
/// # }
/// ```
pub async fn metadata(&self) -> io::Result<Metadata> {
let std = self.0.clone();
let std = self.std.clone();
asyncify(move || std.metadata()).await
}

Expand Down Expand Up @@ -283,13 +328,23 @@ impl DirEntry {
/// # }
/// ```
pub async fn file_type(&self) -> io::Result<FileType> {
let std = self.0.clone();
#[cfg(not(any(
target_os = "solaris",
target_os = "illumos",
target_os = "haiku",
target_os = "vxworks"
)))]
if let Some(file_type) = self.file_type {
return Ok(file_type);
}

let std = self.std.clone();
asyncify(move || std.file_type()).await
}

/// Returns a reference to the underlying `std::fs::DirEntry`.
#[cfg(unix)]
pub(super) fn as_inner(&self) -> &std::fs::DirEntry {
&self.0
&self.std
}
}

0 comments on commit 4a4f80c

Please sign in to comment.