From baf255053d6ac30b6749de2051898852a8d5709f Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Fri, 4 Mar 2016 16:34:54 +0100 Subject: [PATCH] kqueue: wait without timeout As noted in #24, reading from the kqueue won't unblock when it is closed on all platforms. This commit solves the issue by additionally watching on a pipe which is closed when Watcher is closed. The read timeout is no longer necessary. --- kqueue.go | 81 ++++++++++++++++++++++++++++--------------------------- 1 file changed, 41 insertions(+), 40 deletions(-) diff --git a/kqueue.go b/kqueue.go index b8ea3084..8d3f5749 100644 --- a/kqueue.go +++ b/kqueue.go @@ -14,16 +14,15 @@ import ( "path/filepath" "sync" "syscall" - "time" ) // Watcher watches a set of files, delivering events to a channel. type Watcher struct { Events chan Event Errors chan error - done chan bool // Channel for sending a "quit message" to the reader goroutine - kq int // File descriptor (as returned by the kqueue() syscall). + kq int // File descriptor (as returned by the kqueue() syscall). + closepipe [2]int // Pipe used for closing. mu sync.Mutex // Protects access to watcher data watches map[string]int // Map of watched file descriptors (key: path). @@ -41,13 +40,14 @@ type pathInfo struct { // NewWatcher establishes a new watcher with the underlying OS and begins waiting for events. func NewWatcher() (*Watcher, error) { - kq, err := kqueue() + kq, closepipe, err := kqueue() if err != nil { return nil, err } w := &Watcher{ kq: kq, + closepipe: closepipe, watches: make(map[string]int), dirFlags: make(map[string]uint32), paths: make(map[int]pathInfo), @@ -55,7 +55,6 @@ func NewWatcher() (*Watcher, error) { externalWatches: make(map[string]bool), Events: make(chan Event), Errors: make(chan error), - done: make(chan bool), } go w.readEvents() @@ -88,8 +87,8 @@ func (w *Watcher) Close() error { } } - // Send "quit" message to the reader goroutine: - w.done <- true + // Send "quit" message to the reader goroutine. + syscall.Close(w.closepipe[1]) return nil } @@ -154,9 +153,6 @@ func (w *Watcher) Remove(name string) error { // Watch all events (except NOTE_EXTEND, NOTE_LINK, NOTE_REVOKE) const noteAllEvents = syscall.NOTE_DELETE | syscall.NOTE_WRITE | syscall.NOTE_ATTRIB | syscall.NOTE_RENAME -// keventWaitTime to block on each read from kevent -var keventWaitTime = durationToTimespec(100 * time.Millisecond) - // addWatch adds name to the watched file set. // The flags are interpreted as described in kevent(2). // Returns the real path to the file which was added, if any, which may be different from the one passed in the case of symlinks. @@ -264,23 +260,18 @@ func (w *Watcher) addWatch(name string, flags uint32) (string, error) { // Event values that it sends down the Events channel. func (w *Watcher) readEvents() { eventBuffer := make([]syscall.Kevent_t, 10) - - for { - // See if there is a message on the "done" channel - select { - case <-w.done: - err := syscall.Close(w.kq) - if err != nil { - w.Errors <- err - } - close(w.Events) - close(w.Errors) - return - default: + defer func() { + err := syscall.Close(w.kq) + if err != nil { + w.Errors <- err } + syscall.Close(w.closepipe[0]) + close(w.Events) + close(w.Errors) + }() - // Get new events - kevents, err := read(w.kq, eventBuffer, &keventWaitTime) + for closed := false; !closed; { + kevents, err := read(w.kq, eventBuffer) // EINTR is okay, the syscall was interrupted before timeout expired. if err != nil && err != syscall.EINTR { w.Errors <- err @@ -288,10 +279,16 @@ func (w *Watcher) readEvents() { } // Flush the events we received to the Events channel - for len(kevents) > 0 { - kevent := &kevents[0] + for _, kevent := range kevents { watchfd := int(kevent.Ident) mask := uint32(kevent.Fflags) + // Shut down the loop when the pipe is closed, + // but only after all other events have been processed. + if watchfd == w.closepipe[0] { + closed = true + continue + } + w.mu.Lock() path := w.paths[watchfd] w.mu.Unlock() @@ -346,9 +343,6 @@ func (w *Watcher) readEvents() { } } } - - // Move to next event - kevents = kevents[1:] } } } @@ -460,12 +454,24 @@ func (w *Watcher) internalWatch(name string, fileInfo os.FileInfo) (string, erro } // kqueue creates a new kernel event queue and returns a descriptor. -func kqueue() (kq int, err error) { +func kqueue() (kq int, closepipe [2]int, err error) { kq, err = syscall.Kqueue() if kq == -1 { - return kq, err + return kq, closepipe, err + } + // Register the close pipe. + if err := syscall.Pipe(closepipe[:]); err != nil { + syscall.Close(kq) + return kq, closepipe, err } - return kq, nil + events := make([]syscall.Kevent_t, 1) + const pflag = syscall.EV_ADD | syscall.EV_ENABLE | syscall.EV_ONESHOT + syscall.SetKevent(&events[0], closepipe[0], syscall.EVFILT_READ, pflag) + if ok, err := syscall.Kevent(kq, events, nil, nil); ok == -1 { + syscall.Close(kq) + return kq, closepipe, err + } + return kq, closepipe, nil } // register events with the queue @@ -488,15 +494,10 @@ func register(kq int, fds []int, flags int, fflags uint32) error { // read retrieves pending events, or waits until an event occurs. // A timeout of nil blocks indefinitely, while 0 polls the queue. -func read(kq int, events []syscall.Kevent_t, timeout *syscall.Timespec) ([]syscall.Kevent_t, error) { - n, err := syscall.Kevent(kq, nil, events, timeout) +func read(kq int, events []syscall.Kevent_t) ([]syscall.Kevent_t, error) { + n, err := syscall.Kevent(kq, nil, events, nil) if err != nil { return nil, err } return events[0:n], nil } - -// durationToTimespec prepares a timeout value -func durationToTimespec(d time.Duration) syscall.Timespec { - return syscall.NsecToTimespec(d.Nanoseconds()) -}