diff --git a/kqueue.go b/kqueue.go index c2b4acb1..cfa61610 100644 --- a/kqueue.go +++ b/kqueue.go @@ -13,7 +13,6 @@ import ( "os" "path/filepath" "sync" - "time" "golang.org/x/sys/unix" ) @@ -22,9 +21,9 @@ import ( type Watcher struct { Events chan Event Errors chan error - done chan bool // Channel for sending a "quit message" to the reader goroutine - kq int // File descriptor (as returned by the kqueue() syscall). + kq int // File descriptor (as returned by the kqueue() syscall). + closepipe [2]int // Pipe used for closing. mu sync.Mutex // Protects access to watcher data watches map[string]int // Map of watched file descriptors (key: path). @@ -42,13 +41,14 @@ type pathInfo struct { // NewWatcher establishes a new watcher with the underlying OS and begins waiting for events. func NewWatcher() (*Watcher, error) { - kq, err := kqueue() + kq, closepipe, err := kqueue() if err != nil { return nil, err } w := &Watcher{ kq: kq, + closepipe: closepipe, watches: make(map[string]int), dirFlags: make(map[string]uint32), paths: make(map[int]pathInfo), @@ -56,7 +56,6 @@ func NewWatcher() (*Watcher, error) { externalWatches: make(map[string]bool), Events: make(chan Event), Errors: make(chan error), - done: make(chan bool), } go w.readEvents() @@ -89,8 +88,8 @@ func (w *Watcher) Close() error { } } - // Send "quit" message to the reader goroutine: - w.done <- true + // Send "quit" message to the reader goroutine. + unix.Close(w.closepipe[1]) return nil } @@ -155,9 +154,6 @@ func (w *Watcher) Remove(name string) error { // Watch all events (except NOTE_EXTEND, NOTE_LINK, NOTE_REVOKE) const noteAllEvents = unix.NOTE_DELETE | unix.NOTE_WRITE | unix.NOTE_ATTRIB | unix.NOTE_RENAME -// keventWaitTime to block on each read from kevent -var keventWaitTime = durationToTimespec(100 * time.Millisecond) - // addWatch adds name to the watched file set. // The flags are interpreted as described in kevent(2). // Returns the real path to the file which was added, if any, which may be different from the one passed in the case of symlinks. @@ -265,23 +261,18 @@ func (w *Watcher) addWatch(name string, flags uint32) (string, error) { // Event values that it sends down the Events channel. func (w *Watcher) readEvents() { eventBuffer := make([]unix.Kevent_t, 10) - - for { - // See if there is a message on the "done" channel - select { - case <-w.done: - err := unix.Close(w.kq) - if err != nil { - w.Errors <- err - } - close(w.Events) - close(w.Errors) - return - default: + defer func() { + err := unix.Close(w.kq) + if err != nil { + w.Errors <- err } + unix.Close(w.closepipe[0]) + close(w.Events) + close(w.Errors) + }() - // Get new events - kevents, err := read(w.kq, eventBuffer, &keventWaitTime) + for closed := false; !closed; { + kevents, err := read(w.kq, eventBuffer) // EINTR is okay, the syscall was interrupted before timeout expired. if err != nil && err != unix.EINTR { w.Errors <- err @@ -289,10 +280,16 @@ func (w *Watcher) readEvents() { } // Flush the events we received to the Events channel - for len(kevents) > 0 { - kevent := &kevents[0] + for _, kevent := range kevents { watchfd := int(kevent.Ident) mask := uint32(kevent.Fflags) + // Shut down the loop when the pipe is closed, + // but only after all other events have been processed. + if watchfd == w.closepipe[0] { + closed = true + continue + } + w.mu.Lock() path := w.paths[watchfd] w.mu.Unlock() @@ -347,9 +344,6 @@ func (w *Watcher) readEvents() { } } } - - // Move to next event - kevents = kevents[1:] } } } @@ -461,12 +455,26 @@ func (w *Watcher) internalWatch(name string, fileInfo os.FileInfo) (string, erro } // kqueue creates a new kernel event queue and returns a descriptor. -func kqueue() (kq int, err error) { +func kqueue() (kq int, closepipe [2]int, err error) { kq, err = unix.Kqueue() if kq == -1 { - return kq, err + return kq, closepipe, err + } + // Register the close pipe. + if err := unix.Pipe(closepipe[:]); err != nil { + unix.Close(kq) + return kq, closepipe, err } - return kq, nil + events := make([]unix.Kevent_t, 1) + const pflag = unix.EV_ADD | unix.EV_ENABLE | unix.EV_ONESHOT + unix.SetKevent(&events[0], closepipe[0], unix.EVFILT_READ, pflag) + if ok, err := unix.Kevent(kq, events, nil, nil); ok == -1 { + unix.Close(kq) + unix.Close(closepipe[0]) + unix.Close(closepipe[1]) + return kq, closepipe, err + } + return kq, closepipe, nil } // register events with the queue @@ -489,15 +497,10 @@ func register(kq int, fds []int, flags int, fflags uint32) error { // read retrieves pending events, or waits until an event occurs. // A timeout of nil blocks indefinitely, while 0 polls the queue. -func read(kq int, events []unix.Kevent_t, timeout *unix.Timespec) ([]unix.Kevent_t, error) { - n, err := unix.Kevent(kq, nil, events, timeout) +func read(kq int, events []unix.Kevent_t) ([]unix.Kevent_t, error) { + n, err := unix.Kevent(kq, nil, events, nil) if err != nil { return nil, err } return events[0:n], nil } - -// durationToTimespec prepares a timeout value -func durationToTimespec(d time.Duration) unix.Timespec { - return unix.NsecToTimespec(d.Nanoseconds()) -}