Skip to content

Commit

Permalink
inotify: add recursive watcher
Browse files Browse the repository at this point in the history
This adds a recursive watcher for inotify.
  • Loading branch information
arp242 committed May 1, 2024
1 parent 17d9053 commit f4c00e5
Show file tree
Hide file tree
Showing 9 changed files with 231 additions and 59 deletions.
208 changes: 164 additions & 44 deletions backend_inotify.go
Expand Up @@ -9,6 +9,7 @@ import (
"errors"
"fmt"
"io"
"io/fs"
"os"
"path/filepath"
"strings"
Expand Down Expand Up @@ -163,9 +164,10 @@ type (
path map[string]uint32 // pathname → wd
}
watch struct {
wd uint32 // Watch descriptor (as returned by the inotify_add_watch() syscall)
flags uint32 // inotify flags of this watch (see inotify(7) for the list of valid flags)
path string // Watch path.
wd uint32 // Watch descriptor (as returned by the inotify_add_watch() syscall)
flags uint32 // inotify flags of this watch (see inotify(7) for the list of valid flags)
path string // Watch path.
recurse bool // Recursion with ./...?
}
koekje struct {
cookie uint32
Expand Down Expand Up @@ -200,19 +202,37 @@ func (w *watches) remove(wd uint32) {
delete(w.wd, wd)
}

func (w *watches) removePath(path string) (uint32, bool) {
func (w *watches) removePath(path string) ([]uint32, error) {
w.mu.Lock()
defer w.mu.Unlock()

path, recurse := recursivePath(path)
wd, ok := w.path[path]
if !ok {
return 0, false
return nil, fmt.Errorf("%w: %s", ErrNonExistentWatch, path)
}

watch := w.wd[wd]
if recurse && !watch.recurse {
return nil, fmt.Errorf("can't use /... with non-recursive watch %q", path)
}

delete(w.path, path)
delete(w.wd, wd)
if !watch.recurse {
return []uint32{wd}, nil
}

return wd, true
wds := make([]uint32, 0, 8)
wds = append(wds, wd)
for p, rwd := range w.path {
if filepath.HasPrefix(p, path) {
delete(w.path, p)
delete(w.wd, rwd)
wds = append(wds, rwd)
}
}
return wds, nil
}

func (w *watches) byPath(path string) *watch {
Expand Down Expand Up @@ -388,20 +408,51 @@ func (w *Watcher) Add(name string) error { return w.AddWith(name) }
//
// - [WithBufferSize] sets the buffer size for the Windows backend; no-op on
// other platforms. The default is 64K (65536 bytes).
func (w *Watcher) AddWith(name string, opts ...addOpt) error {
func (w *Watcher) AddWith(path string, opts ...addOpt) error {
if w.isClosed() {
return ErrClosed
}
if debug {
fmt.Fprintf(os.Stderr, "FSNOTIFY_DEBUG: %s AddWith(%q)\n",
time.Now().Format("15:04:05.000000000"), name)
time.Now().Format("15:04:05.000000000"), path)
}

with := getOptions(opts...)
if !w.xSupports(with.op) {
return fmt.Errorf("%w: %s", xErrUnsupported, with.op)
}

path, recurse := recursivePath(path)
if recurse {
return filepath.WalkDir(path, func(root string, d fs.DirEntry, err error) error {
if err != nil {
return err
}
if !d.IsDir() {
if root == path {
return fmt.Errorf("fsnotify: not a directory: %q", path)
}
return nil
}

// Send a Create event when adding new directory from a recursive
// watch; this is for "mkdir -p one/two/three". Usually all those
// directories will be created before we can set up watchers on the
// subdirectories, so only "one" would be sent as a Create event and
// not "one/two" and "one/two/three" (inotifywait -r has the same
// problem).
if with.sendCreate && root != path {
w.sendEvent(Event{Name: root, Op: Create})
}

return w.add(root, with, true)
})
}

return w.add(path, with, false)
}

func (w *Watcher) add(path string, with withOpts, recurse bool) error {
var flags uint32
if with.noFollow {
flags |= unix.IN_DONT_FOLLOW
Expand Down Expand Up @@ -433,23 +484,26 @@ func (w *Watcher) AddWith(name string, opts ...addOpt) error {
if with.op.Has(xUnportableCloseRead) {
flags |= unix.IN_CLOSE_NOWRITE
}
return w.register(path, flags, recurse)
}

name = filepath.Clean(name)
return w.watches.updatePath(name, func(existing *watch) (*watch, error) {
func (w *Watcher) register(path string, flags uint32, recurse bool) error {
return w.watches.updatePath(path, func(existing *watch) (*watch, error) {
if existing != nil {
flags |= existing.flags | unix.IN_MASK_ADD
}

wd, err := unix.InotifyAddWatch(w.fd, name, flags)
wd, err := unix.InotifyAddWatch(w.fd, path, flags)
if wd == -1 {
return nil, err
}

if existing == nil {
return &watch{
wd: uint32(wd),
path: name,
flags: flags,
wd: uint32(wd),
path: path,
flags: flags,
recurse: recurse,
}, nil
}

Expand Down Expand Up @@ -479,24 +533,27 @@ func (w *Watcher) Remove(name string) error {
}

func (w *Watcher) remove(name string) error {
wd, ok := w.watches.removePath(name)
if !ok {
return fmt.Errorf("%w: %s", ErrNonExistentWatch, name)
}

success, errno := unix.InotifyRmWatch(w.fd, wd)
if success == -1 {
// TODO: Perhaps it's not helpful to return an error here in every case;
// The only two possible errors are:
//
// - EBADF, which happens when w.fd is not a valid file descriptor
// of any kind.
// - EINVAL, which is when fd is not an inotify descriptor or wd
// is not a valid watch descriptor. Watch descriptors are
// invalidated when they are removed explicitly or implicitly;
// explicitly by inotify_rm_watch, implicitly when the file they
// are watching is deleted.
return errno
wds, err := w.watches.removePath(name)
if err != nil {
return err
}

for _, wd := range wds {
_, err := unix.InotifyRmWatch(w.fd, wd)
if err != nil {
// TODO: Perhaps it's not helpful to return an error here in every
// case; the only two possible errors are:
//
// EBADF, which happens when w.fd is not a valid file descriptor of
// any kind.
//
// EINVAL, which is when fd is not an inotify descriptor or wd is
// not a valid watch descriptor. Watch descriptors are invalidated
// when they are removed explicitly or implicitly; explicitly by
// inotify_rm_watch, implicitly when the file they are watching is
// deleted.
return err
}
}
return nil
}
Expand Down Expand Up @@ -566,6 +623,7 @@ func (w *Watcher) readEvents() {
}

var offset uint32

// We don't know how many events we just read into the buffer
// While the offset points to at least one whole event...
for offset <= uint32(n-unix.SizeofInotifyEvent) {
Expand All @@ -574,6 +632,8 @@ func (w *Watcher) readEvents() {
raw = (*unix.InotifyEvent)(unsafe.Pointer(&buf[offset]))
mask = uint32(raw.Mask)
nameLen = uint32(raw.Len)
// Move to the next event in the buffer
next = func() { offset += unix.SizeofInotifyEvent + nameLen }
)

if mask&unix.IN_Q_OVERFLOW != 0 {
Expand Down Expand Up @@ -603,15 +663,26 @@ func (w *Watcher) readEvents() {
internal.Debug(name, raw.Mask, raw.Cookie)
}

if mask&unix.IN_IGNORED != 0 { //&& event.Op != 0
next()
continue
}

// inotify will automatically remove the watch on deletes; just need
// to clean our state here.
if watch != nil && mask&unix.IN_DELETE_SELF == unix.IN_DELETE_SELF {
w.watches.remove(watch.wd)
}

// We can't really update the state when a watched path is moved;
// only IN_MOVE_SELF is sent and not IN_MOVED_{FROM,TO}. So remove
// the watch.
if watch != nil && mask&unix.IN_MOVE_SELF == unix.IN_MOVE_SELF {
if watch.recurse {
next() // Do nothing
continue
}

err := w.remove(watch.path)
if err != nil && !errors.Is(err, ErrNonExistentWatch) {
if !w.sendError(err) {
Expand All @@ -620,30 +691,68 @@ func (w *Watcher) readEvents() {
}
}

skip := mask&unix.IN_IGNORED != 0

/// Skip if we're watching both this path and the parent; the parent
/// will already send a delete so no need to do it twice.
if !skip && mask&unix.IN_DELETE_SELF != 0 {
if mask&unix.IN_DELETE_SELF != 0 {
if _, ok := w.watches.path[filepath.Dir(watch.path)]; ok {
skip = true
next()
continue
}
}

/// Send the events that are not ignored on the events channel
if !skip {
if !w.sendEvent(w.newEvent(name, mask, raw.Cookie)) {
return
ev := w.newEvent(name, mask, raw.Cookie)
// Need to update watch path for recurse.
if watch != nil && watch.recurse {
isDir := mask&unix.IN_ISDIR == unix.IN_ISDIR
/// New directory created: set up watch on it.
if isDir && ev.Has(Create) {
err := w.register(ev.Name, watch.flags, true)
if !w.sendError(err) {
return
}

// This was a directory rename, so we need to update all
// the children.
//
// TODO: this is of course pretty slow; we should use a
// better data structure for storing all of this, e.g. store
// children in the watch. I have some code for this in my
// kqueue refactor we can use in the future. For now I'm
// okay with this as it's not publicly available.
// Correctness first, performance second.
if ev.renamedFrom != "" {
w.watches.mu.Lock()
for k, ww := range w.watches.wd {
if k == watch.wd || ww.path == ev.Name {
continue
}
if strings.HasPrefix(ww.path, ev.renamedFrom) {
ww.path = strings.Replace(ww.path, ev.renamedFrom, ev.Name, 1)
w.watches.wd[k] = ww
}
}
w.watches.mu.Unlock()
}
}
}

/// Move to the next event in the buffer
offset += unix.SizeofInotifyEvent + nameLen
/// Send the events that are not ignored on the events channel
if !w.sendEvent(ev) {
return
}
next()
}
}
}

// newEvent returns an platform-independent Event based on an inotify mask.
func (w *Watcher) isRecursive(path string) bool {
ww := w.watches.byPath(path)
if ww == nil { // path could be a file, so also check the Dir.
ww = w.watches.byPath(filepath.Dir(path))
}
return ww != nil && ww.recurse
}

func (w *Watcher) newEvent(name string, mask, cookie uint32) Event {
e := Event{Name: name}
if mask&unix.IN_CREATE == unix.IN_CREATE || mask&unix.IN_MOVED_TO == unix.IN_MOVED_TO {
Expand All @@ -667,7 +776,10 @@ func (w *Watcher) newEvent(name string, mask, cookie uint32) Event {
if mask&unix.IN_CLOSE_NOWRITE == unix.IN_CLOSE_NOWRITE {
e.Op |= xUnportableCloseRead
}
if mask&unix.IN_MOVE_SELF == unix.IN_MOVE_SELF || mask&unix.IN_MOVED_FROM == unix.IN_MOVED_FROM {
if mask&unix.IN_MOVED_FROM == unix.IN_MOVED_FROM {
e.Op |= Rename
}
if mask&unix.IN_MOVE_SELF == unix.IN_MOVE_SELF {
e.Op |= Rename
}
if mask&unix.IN_ATTRIB == unix.IN_ATTRIB {
Expand Down Expand Up @@ -706,3 +818,11 @@ func (w *Watcher) newEvent(name string, mask, cookie uint32) Event {
func (w *Watcher) xSupports(op Op) bool {
return true // Supports everything.
}

func (w *Watcher) state() {
w.watches.mu.Lock()
defer w.watches.mu.Unlock()
for wd, ww := range w.watches.wd {
fmt.Fprintf(os.Stderr, "%4d: recurse=%t %q\n", wd, ww.recurse, ww.path)
}
}
13 changes: 10 additions & 3 deletions fsnotify.go
Expand Up @@ -188,9 +188,10 @@ func (e Event) String() string {
type (
addOpt func(opt *withOpts)
withOpts struct {
bufsize int
op Op
noFollow bool
bufsize int
op Op
noFollow bool
sendCreate bool
}
)

Expand Down Expand Up @@ -254,11 +255,17 @@ func withNoFollow() addOpt {
return func(opt *withOpts) { opt.noFollow = true }
}

// "Internal" option for recursive watches on inotify.
func withCreate() addOpt {
return func(opt *withOpts) { opt.sendCreate = true }
}

var enableRecurse = false

// Check if this path is recursive (ends with "/..." or "\..."), and return the
// path with the /... stripped.
func recursivePath(path string) (string, bool) {
path = filepath.Clean(path)
if !enableRecurse { // Only enabled in tests for now.
return path, false
}
Expand Down
6 changes: 1 addition & 5 deletions fsnotify_test.go
Expand Up @@ -31,13 +31,9 @@ func init() {

func TestScript(t *testing.T) {
err := filepath.Walk("./testdata", func(path string, info fs.FileInfo, err error) error {
if err != nil {
if err != nil || info.IsDir() {
return err
}
if info.IsDir() {
return nil
}
//t.Run(filepath.ToSlash(path), func(t *testing.T) {
n := strings.Split(filepath.ToSlash(path), "/")
t.Run(strings.Join(n[1:], "/"), func(t *testing.T) {
t.Parallel()
Expand Down

0 comments on commit f4c00e5

Please sign in to comment.