Skip to content

Commit

Permalink
inotify: simplify bookkeeping of watched paths
Browse files Browse the repository at this point in the history
Create a new watcher type to keep track of the watches instead of
keeping two maps on the Watcher and accessing these directly.

This makes the bookkeeping a bit easier to follow, and we no longer need
to worry about locking map access as the watcher type takes care of that
now.

Came up in #472 where I want to keep track if a path was added
recursively, and this makes that a bit easier.

Also seems a bit faster:

	BenchmarkWatch-2          903709              7122 ns/op             194 B/op          3 allocs/op
	BenchmarkWatch-2          923980              6322 ns/op             196 B/op          3 allocs/op

Although that benchmark is very simply and only tests one code path;
just want to make sure it's not a horrible regression.
  • Loading branch information
arp242 committed Dec 20, 2022
1 parent 36dadbe commit 0c2dd73
Show file tree
Hide file tree
Showing 3 changed files with 92 additions and 55 deletions.
140 changes: 90 additions & 50 deletions backend_inotify.go
Expand Up @@ -130,12 +130,64 @@ type Watcher struct {
// Store fd here as os.File.Read() will no longer return on close after
// calling Fd(). See: https://github.com/golang/go/issues/26439
fd int
mu sync.Mutex // Map access
inotifyFile *os.File
watches map[string]*watch // Map of inotify watches (path → watch)
paths map[int]string // Map of watched paths (watch descriptor → path)
done chan struct{} // Channel for sending a "quit message" to the reader goroutine
doneResp chan struct{} // Channel to respond to Close
watches *watches
done chan struct{} // Channel for sending a "quit message" to the reader goroutine
closeMu sync.Mutex
doneResp chan struct{} // Channel to respond to Close
}

type (
watches struct {
mu sync.RWMutex
wd map[uint32]*watch // wd → watch
path map[string]uint32 // pathname → wd
}
watch struct {
wd uint32 // Watch descriptor (as returned by the inotify_add_watch() syscall)
flags uint32 // inotify flags of this watch (see inotify(7) for the list of valid flags)
path string // Watch path.
}
)

func newWatches() *watches {
return &watches{
wd: make(map[uint32]*watch),
path: make(map[string]uint32),
}
}

func (w *watches) len() int {
w.mu.RLock()
defer w.mu.RUnlock()
return len(w.wd)
}

func (w *watches) add(ww *watch) {
w.mu.Lock()
defer w.mu.Unlock()
w.wd[ww.wd] = ww
w.path[ww.path] = ww.wd
}

func (w *watches) removeWd(wd uint32) {
w.mu.Lock()
defer w.mu.Unlock()

delete(w.path, w.wd[wd].path)
delete(w.wd, wd)
}

func (w *watches) byPath(path string) *watch {
w.mu.RLock()
defer w.mu.RUnlock()
return w.wd[w.path[path]]
}

func (w *watches) byWd(wd uint32) *watch {
w.mu.RLock()
defer w.mu.RUnlock()
return w.wd[wd]
}

// NewWatcher creates a new Watcher.
Expand All @@ -151,8 +203,7 @@ func NewWatcher() (*Watcher, error) {
w := &Watcher{
fd: fd,
inotifyFile: os.NewFile(uintptr(fd), ""),
watches: make(map[string]*watch),
paths: make(map[int]string),
watches: newWatches(),
Events: make(chan Event),
Errors: make(chan error),
done: make(chan struct{}),
Expand All @@ -169,8 +220,8 @@ func (w *Watcher) sendEvent(e Event) bool {
case w.Events <- e:
return true
case <-w.done:
return false
}
return false
}

// Returns true if the error was sent, or false if watcher is closed.
Expand All @@ -194,15 +245,13 @@ func (w *Watcher) isClosed() bool {

// Close removes all watches and closes the events channel.
func (w *Watcher) Close() error {
w.mu.Lock()
w.closeMu.Lock()
if w.isClosed() {
w.mu.Unlock()
w.closeMu.Unlock()
return nil
}

// Send 'close' signal to goroutine, and set the Watcher to closed.
close(w.done)
w.mu.Unlock()
w.closeMu.Unlock()

// Causes any blocking reads to return with an error, provided the file
// still supports deadline operations.
Expand Down Expand Up @@ -273,23 +322,24 @@ func (w *Watcher) AddWith(name string, opts ...addOpt) error {
unix.IN_CREATE | unix.IN_ATTRIB | unix.IN_MODIFY |
unix.IN_MOVE_SELF | unix.IN_DELETE | unix.IN_DELETE_SELF

w.mu.Lock()
defer w.mu.Unlock()
watchEntry := w.watches[name]
if watchEntry != nil {
flags |= watchEntry.flags | unix.IN_MASK_ADD
existing := w.watches.byPath(name)
if existing != nil {
flags |= existing.flags | unix.IN_MASK_ADD
}
wd, errno := unix.InotifyAddWatch(w.fd, name, flags)
if wd == -1 {
return errno
}

if watchEntry == nil {
w.watches[name] = &watch{wd: uint32(wd), flags: flags}
w.paths[wd] = name
if existing == nil {
w.watches.add(&watch{
wd: uint32(wd),
path: name,
flags: flags,
})
} else {
watchEntry.wd = uint32(wd)
watchEntry.flags = flags
existing.wd = uint32(wd)
existing.flags = flags
}

return nil
Expand Down Expand Up @@ -317,22 +367,16 @@ func (w *Watcher) Remove(name string) error {

name = filepath.Clean(name)

// Fetch the watch.
w.mu.Lock()
defer w.mu.Unlock()

watch, ok := w.watches[name]
if !ok {
watch := w.watches.byPath(name)
if watch == nil {
return fmt.Errorf("%w: %s", ErrNonExistentWatch, name)
}

return w.remove(name, watch)
}

// Unlocked!
func (w *Watcher) remove(name string, watch *watch) error {
delete(w.paths, int(watch.wd))
delete(w.watches, name)
w.watches.removeWd(watch.wd)

success, errno := unix.InotifyRmWatch(w.fd, watch.wd)
if success == -1 {
Expand All @@ -359,22 +403,16 @@ func (w *Watcher) WatchList() []string {
return nil
}

w.mu.Lock()
defer w.mu.Unlock()

entries := make([]string, 0, len(w.watches))
for pathname := range w.watches {
entries := make([]string, 0, w.watches.len())
w.watches.mu.RLock()
for pathname := range w.watches.path {
entries = append(entries, pathname)
}
w.watches.mu.RUnlock()

return entries
}

type watch struct {
wd uint32 // Watch descriptor (as returned by the inotify_add_watch() syscall)
flags uint32 // inotify flags of this watch (see inotify(7) for the list of valid flags)
}

// readEvents reads from the inotify file descriptor, converts the
// received events into Event objects and sends them via the Events channel
func (w *Watcher) readEvents() {
Expand Down Expand Up @@ -444,27 +482,29 @@ func (w *Watcher) readEvents() {
// doesn't append the filename to the event, but we would like to always fill the
// the "Name" field with a valid filename. We retrieve the path of the watch from
// the "paths" map.
w.mu.Lock()
name, ok := w.paths[int(raw.Wd)]
watch := w.watches.byWd(uint32(raw.Wd))

// inotify will automatically remove the watch on deletes; just need
// to clean our state here.
if ok && mask&unix.IN_DELETE_SELF == unix.IN_DELETE_SELF {
delete(w.paths, int(raw.Wd))
delete(w.watches, name)
if watch != nil && mask&unix.IN_DELETE_SELF == unix.IN_DELETE_SELF {
w.watches.removeWd(watch.wd)
}
// We can't really update the state when a watched path is moved;
// only IN_MOVE_SELF is sent and not IN_MOVED_{FROM,TO}. So remove
// the watch.
if ok && mask&unix.IN_MOVE_SELF == unix.IN_MOVE_SELF {
err := w.remove(name, w.watches[name])
if watch != nil && mask&unix.IN_MOVE_SELF == unix.IN_MOVE_SELF {
err := w.remove(watch.path, watch)
if err != nil {
if !w.sendError(err) {
return
}
}
}
w.mu.Unlock()

var name string
if watch != nil {
name = watch.path
}
if nameLen > 0 {
// Point "bytes" at the first byte of the filename
bytes := (*[unix.PathMax]byte)(unsafe.Pointer(&buf[offset+unix.SizeofInotifyEvent]))[:nameLen:nameLen]
Expand Down
5 changes: 1 addition & 4 deletions backend_inotify_test.go
Expand Up @@ -117,12 +117,9 @@ func TestRemoveState(t *testing.T) {

check := func(want int) {
t.Helper()
if len(w.watches) != want {
if w.watches.len() != want {
t.Error(w.watches)
}
if len(w.paths) != want {
t.Error(w.paths)
}
}

check(2)
Expand Down
2 changes: 1 addition & 1 deletion fsnotify_test.go
Expand Up @@ -1503,7 +1503,7 @@ func BenchmarkWatch(b *testing.B) {
wg.Done()
return
}
b.Fatal(err)
b.Error(err)
case _, ok := <-w.Events:
if !ok {
wg.Done()
Expand Down

0 comments on commit 0c2dd73

Please sign in to comment.