Skip to content

Commit

Permalink
Fix possible deadlock on closing the watcher on kqueue (#230)
Browse files Browse the repository at this point in the history
* avoid deadlocks on Close()

raw channel work (not inside a select) should
always be prohibited in production code, as
it readily causes deadlocks on shutdown.

Also adds the test TestWatcherClose from #145.
This request duplicates that test, with two
lines fixed to address the houndcli-bot review
concerns.

Fixes #187
Fixes #145

* cleanup and simpler test

* also fix #225

* fix tests
  • Loading branch information
nhooyr authored and nathany committed Jan 10, 2018
1 parent 4da3e2c commit 3d33f50
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 23 deletions.
32 changes: 31 additions & 1 deletion fsnotify_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,11 @@

package fsnotify

import "testing"
import (
"os"
"testing"
"time"
)

func TestEventStringWithValue(t *testing.T) {
for opMask, expectedString := range map[Op]string{
Expand Down Expand Up @@ -38,3 +42,29 @@ func TestEventOpStringWithNoValue(t *testing.T) {
t.Fatalf("Expected %s, got: %v", expectedOpString, event.Op.String())
}
}

// TestWatcherClose tests that the goroutine started by creating the watcher can be
// signalled to return at any time, even if there is no goroutine listening on the events
// or errors channels.
func TestWatcherClose(t *testing.T) {
t.Parallel()

name := tempMkFile(t, "")
w := newWatcher(t)
err := w.Add(name)
if err != nil {
t.Fatal(err)
}

err = os.Remove(name)
if err != nil {
t.Fatal(err)
}
// Allow the watcher to receive the event.
time.Sleep(time.Millisecond * 100)

err = w.Close()
if err != nil {
t.Fatal(err)
}
}
62 changes: 40 additions & 22 deletions kqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (
type Watcher struct {
Events chan Event
Errors chan error
done chan bool // Channel for sending a "quit message" to the reader goroutine
done chan struct{} // Channel for sending a "quit message" to the reader goroutine

kq int // File descriptor (as returned by the kqueue() syscall).

Expand Down Expand Up @@ -56,7 +56,7 @@ func NewWatcher() (*Watcher, error) {
externalWatches: make(map[string]bool),
Events: make(chan Event),
Errors: make(chan error),
done: make(chan bool),
done: make(chan struct{}),
}

go w.readEvents()
Expand All @@ -71,26 +71,21 @@ func (w *Watcher) Close() error {
return nil
}
w.isClosed = true
w.mu.Unlock()

// copy paths to remove while locked
w.mu.Lock()
var pathsToRemove = make([]string, 0, len(w.watches))
for name := range w.watches {
pathsToRemove = append(pathsToRemove, name)
}
w.mu.Unlock()
// unlock before calling Remove, which also locks

var err error
for _, name := range pathsToRemove {
if e := w.Remove(name); e != nil && err == nil {
err = e
}
w.Remove(name)
}

// Send "quit" message to the reader goroutine:
w.done <- true
// send a "quit" message to the reader goroutine
close(w.done)

return nil
}
Expand Down Expand Up @@ -266,25 +261,24 @@ func (w *Watcher) addWatch(name string, flags uint32) (string, error) {
func (w *Watcher) readEvents() {
eventBuffer := make([]unix.Kevent_t, 10)

loop:
for {
// See if there is a message on the "done" channel
select {
case <-w.done:
err := unix.Close(w.kq)
if err != nil {
w.Errors <- err
}
close(w.Events)
close(w.Errors)
return
break loop
default:
}

// Get new events
kevents, err := read(w.kq, eventBuffer, &keventWaitTime)
// EINTR is okay, the syscall was interrupted before timeout expired.
if err != nil && err != unix.EINTR {
w.Errors <- err
select {
case w.Errors <- err:
case <-w.done:
break loop
}
continue
}

Expand Down Expand Up @@ -319,8 +313,12 @@ func (w *Watcher) readEvents() {
if path.isDir && event.Op&Write == Write && !(event.Op&Remove == Remove) {
w.sendDirectoryChangeEvents(event.Name)
} else {
// Send the event on the Events channel
w.Events <- event
// Send the event on the Events channel.
select {
case w.Events <- event:
case <-w.done:
break loop
}
}

if event.Op&Remove == Remove {
Expand Down Expand Up @@ -352,6 +350,18 @@ func (w *Watcher) readEvents() {
kevents = kevents[1:]
}
}

// cleanup
err := unix.Close(w.kq)
if err != nil {
// only way the previous loop breaks is if w.done was closed so we need to async send to w.Errors.
select {
case w.Errors <- err:
default:
}
}
close(w.Events)
close(w.Errors)
}

// newEvent returns an platform-independent Event based on kqueue Fflags.
Expand Down Expand Up @@ -407,7 +417,11 @@ func (w *Watcher) sendDirectoryChangeEvents(dirPath string) {
// Get all files
files, err := ioutil.ReadDir(dirPath)
if err != nil {
w.Errors <- err
select {
case w.Errors <- err:
case <-w.done:
return
}
}

// Search for new files
Expand All @@ -428,7 +442,11 @@ func (w *Watcher) sendFileCreatedEventIfNew(filePath string, fileInfo os.FileInf
w.mu.Unlock()
if !doesExist {
// Send create event
w.Events <- newCreateEvent(filePath)
select {
case w.Events <- newCreateEvent(filePath):
case <-w.done:
return
}
}

// like watchDirectoryFiles (but without doing another ReadDir)
Expand Down

0 comments on commit 3d33f50

Please sign in to comment.