Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Modify PollWatcher to support pseudo filesystems like sysfs/procfs #396

Merged
merged 13 commits into from Apr 28, 2022
2 changes: 1 addition & 1 deletion .github/workflows/main.yml
Expand Up @@ -18,7 +18,7 @@ jobs:
strategy:
matrix:
version:
- 1.47.0 # MSRV
- 1.50.0 # MSRV
- stable
- nightly
os: [ubuntu-latest, macos-latest, windows-latest]
Expand Down
1 change: 1 addition & 0 deletions Cargo.toml
Expand Up @@ -49,6 +49,7 @@ mio = { version = "0.8", features = ["os-ext"] }
futures = "0.3"
serde_json = "1.0.39"
tempfile = "3.2.0"
nix = "0.23.1"

[features]
default = ["macos_fsevent"]
Expand Down
54 changes: 54 additions & 0 deletions examples/poll_sysfs.rs
@@ -0,0 +1,54 @@
use notify::poll::PollWatcherConfig;
use notify::{PollWatcher, RecursiveMode, Watcher};
use std::path::Path;
use std::time::Duration;

#[cfg(not(target_os = "windows"))]
fn not_windows_main() -> notify::Result<()> {
let mut paths: Vec<_> = std::env::args()
.skip(1)
.map(|arg| Path::new(&arg).to_path_buf())
.collect();
if paths.is_empty() {
let lo_stats = Path::new("/sys/class/net/lo/statistics/tx_bytes").to_path_buf();
if !lo_stats.exists() {
eprintln!("Must provide path to watch, default system path was not found (probably you're not running on Linux?)");
std::process::exit(1);
}
println!(
"Trying {:?}, use `ping localhost` to see changes!",
lo_stats
);
paths.push(lo_stats);
}

println!("watching {:?}...", paths);

let config = PollWatcherConfig {
compare_contents: true,
poll_interval: Duration::from_secs(2),
};
let (tx, rx) = std::sync::mpsc::channel();
let mut watcher = PollWatcher::with_config(tx, config)?;
for path in paths {
watcher.watch(&path, RecursiveMode::Recursive)?;
}

for res in rx {
match res {
Ok(event) => println!("changed: {:?}", event),
Err(e) => println!("watch error: {:?}", e),
}
}

Ok(())
}

fn main() -> notify::Result<()> {
#[cfg(not(target_os = "windows"))]
{
not_windows_main()
}
#[cfg(target_os = "windows")]
notify::Result::Ok(())
}
181 changes: 126 additions & 55 deletions src/poll.rs
Expand Up @@ -6,21 +6,27 @@
use super::event::*;
use super::{Error, EventHandler, RecursiveMode, Result, Watcher};
use filetime::FileTime;
use std::collections::hash_map::RandomState;
use std::collections::HashMap;
use std::fmt::Debug;
use std::fs;
use std::fs::Metadata;
use std::hash::BuildHasher;
use std::hash::Hasher;
use std::io::{ErrorKind, Read};
use std::path::{Path, PathBuf};
use std::sync::{
atomic::{AtomicBool, Ordering},
Arc, Mutex,
};
use std::thread;
use std::time::{Duration, Instant};
use std::{fs, io};
use walkdir::WalkDir;

#[derive(Debug)]
#[derive(Debug, Clone)]
struct PathData {
mtime: i64,
hash: Option<u64>,
last_check: Instant,
}

Expand All @@ -36,6 +42,33 @@ pub struct PollWatcher {
watches: Arc<Mutex<HashMap<PathBuf, WatchData>>>,
open: Arc<AtomicBool>,
delay: Duration,
compare_contents: bool,
}

/// General purpose configuration for [`PollWatcher`] specifically. Can be used to tune
/// this watcher differently than the other platform specific ones.
#[derive(Debug, Clone)]
pub struct PollWatcherConfig {
/// Interval between each rescan attempt. This can be extremely expensive for large
/// file trees so it is recommended to measure and tune accordingly.
pub poll_interval: Duration,

/// Optional feature that will evaluate the contents of changed files to determine if
/// they have indeed changed using a fast hashing algorithm. This is especially important
/// for pseudo filesystems like those on Linux under /sys and /proc which are not obligated
/// to respect any other filesystem norms such as modification timestamps, file sizes, etc.
/// By enabling this feature, performance will be significantly impacted as all files will
/// need to be read and hashed at each `poll_interval`.
pub compare_contents: bool,
}

impl Default for PollWatcherConfig {
fn default() -> Self {
Self {
poll_interval: Duration::from_secs(30),
compare_contents: false,
}
}
}

impl Debug for PollWatcher {
Expand All @@ -45,6 +78,7 @@ impl Debug for PollWatcher {
.field("watches", &self.watches)
.field("open", &self.open)
.field("delay", &self.delay)
.field("compare_contents", &self.compare_contents)
.finish()
}
}
Expand All @@ -56,14 +90,66 @@ fn emit_event(event_handler: &Mutex<dyn EventHandler>, res: Result<Event>) {
}
}

impl PathData {
pub fn collect<BH: BuildHasher>(
path: &Path,
metadata: &Metadata,
build_hasher: Option<&BH>,
last_check: Instant,
) -> Self {
let mtime = FileTime::from_last_modification_time(metadata).seconds();
let hash = metadata
.is_file()
.then(|| build_hasher.and_then(|bh| Self::hash_file(path, bh).ok()))
.flatten();
Self {
mtime,
hash,
last_check,
}
}

fn hash_file<P: AsRef<Path>, BH: BuildHasher>(path: P, build_hasher: &BH) -> io::Result<u64> {
let mut hasher = build_hasher.build_hasher();
let mut file = fs::File::open(path)?;
let mut buf = [0; 512];
0xpr03 marked this conversation as resolved.
Show resolved Hide resolved
loop {
let n = match file.read(&mut buf) {
Ok(0) => break,
Ok(len) => len,
Err(ref e) if e.kind() == ErrorKind::Interrupted => continue,
Err(e) => return Err(e),
};
hasher.write(&buf[..n]);
}
Ok(hasher.finish())
}

pub fn detect_change(&self, other: &PathData) -> Option<EventKind> {
if self.mtime > other.mtime {
Some(EventKind::Modify(ModifyKind::Metadata(
MetadataKind::WriteTime,
)))
} else if self.hash != other.hash {
Some(EventKind::Modify(ModifyKind::Data(DataChange::Any)))
} else {
None
}
}
}

impl PollWatcher {
/// Create a new [PollWatcher] and set the poll frequency to `delay`.
pub fn with_delay<F: EventHandler>(event_handler: F, delay: Duration) -> Result<PollWatcher> {
/// Create a new [PollWatcher], configured as needed.
pub fn with_config<F: EventHandler>(
event_handler: F,
config: PollWatcherConfig,
) -> Result<PollWatcher> {
let mut p = PollWatcher {
event_handler: Arc::new(Mutex::new(event_handler)),
watches: Arc::new(Mutex::new(HashMap::new())),
open: Arc::new(AtomicBool::new(true)),
delay,
delay: config.poll_interval,
compare_contents: config.compare_contents,
};
p.run();
Ok(p)
Expand All @@ -73,6 +159,7 @@ impl PollWatcher {
let watches = self.watches.clone();
let open = self.open.clone();
let delay = self.delay;
let build_hasher = self.compare_contents.then(RandomState::default);
let event_handler = self.event_handler.clone();
let event_handler = move |res| emit_event(&event_handler, res);

Expand Down Expand Up @@ -108,26 +195,20 @@ impl PollWatcher {
}
Ok(metadata) => {
if !metadata.is_dir() {
let mtime =
FileTime::from_last_modification_time(&metadata)
.seconds();
match paths.insert(
watch.clone(),
PathData {
mtime,
last_check: current_time,
},
) {
let path_data = PathData::collect(
watch,
&metadata,
build_hasher.as_ref(),
current_time,
);
match paths.insert(watch.clone(), path_data.clone()) {
None => {
unreachable!();
}
Some(PathData {
mtime: old_mtime, ..
}) => {
if mtime > old_mtime {
let kind = MetadataKind::WriteTime;
let meta = ModifyKind::Metadata(kind);
let kind = EventKind::Modify(meta);
Some(old_path_data) => {
if let Some(kind) =
path_data.detect_change(&old_path_data)
{
let ev =
Event::new(kind).add_path(watch.clone());
event_handler(Ok(ev));
Expand All @@ -144,23 +225,22 @@ impl PollWatcher {
.filter_map(|e| e.ok())
{
let path = entry.path();

match entry.metadata() {
Err(e) => {
let err = Error::io(e.into())
.add_path(path.to_path_buf());
event_handler(Err(err));
}
Ok(m) => {
let mtime =
FileTime::from_last_modification_time(&m)
.seconds();
let path_data = PathData::collect(
path,
&m,
build_hasher.as_ref(),
current_time,
);
match paths.insert(
path.to_path_buf(),
PathData {
mtime,
last_check: current_time,
},
path_data.clone(),
) {
None => {
let kind =
Expand All @@ -169,14 +249,10 @@ impl PollWatcher {
.add_path(path.to_path_buf());
event_handler(Ok(ev));
}
Some(PathData {
mtime: old_mtime, ..
}) => {
if mtime > old_mtime {
let kind = MetadataKind::WriteTime;
let meta =
ModifyKind::Metadata(kind);
let kind = EventKind::Modify(meta);
Some(old_path_data) => {
if let Some(kind) = path_data
.detect_change(&old_path_data)
{
// TODO add new mtime as attr
let ev = Event::new(kind)
.add_path(path.to_path_buf());
Expand Down Expand Up @@ -214,6 +290,8 @@ impl PollWatcher {
}

fn watch_inner(&mut self, path: &Path, recursive_mode: RecursiveMode) -> Result<()> {
let build_hasher = self.compare_contents.then(RandomState::default);

if let Ok(mut watches) = self.watches.lock() {
let current_time = Instant::now();

Expand All @@ -228,14 +306,9 @@ impl PollWatcher {
let mut paths = HashMap::new();

if !metadata.is_dir() {
let mtime = FileTime::from_last_modification_time(&metadata).seconds();
paths.insert(
watch.clone(),
PathData {
mtime,
last_check: current_time,
},
);
let path_data =
PathData::collect(path, &metadata, build_hasher.as_ref(), current_time);
paths.insert(watch.clone(), path_data);
} else {
let depth = if recursive_mode.is_recursive() {
usize::max_value()
Expand All @@ -256,14 +329,13 @@ impl PollWatcher {
emit_event(&self.event_handler, Err(err));
}
Ok(m) => {
let mtime = FileTime::from_last_modification_time(&m).seconds();
paths.insert(
path.to_path_buf(),
PathData {
mtime,
last_check: current_time,
},
let path_data = PathData::collect(
path,
&m,
build_hasher.as_ref(),
current_time,
);
paths.insert(path.to_path_buf(), path_data);
}
}
}
Expand Down Expand Up @@ -297,8 +369,7 @@ impl Watcher for PollWatcher {
/// The default poll frequency is 30 seconds.
/// Use [with_delay] to manually set the poll frequency.
fn new<F: EventHandler>(event_handler: F) -> Result<Self> {
let delay = Duration::from_secs(30);
Self::with_delay(event_handler, delay)
Self::with_config(event_handler, PollWatcherConfig::default())
}

fn watch(&mut self, path: &Path, recursive_mode: RecursiveMode) -> Result<()> {
Expand Down