Skip to content

Commit

Permalink
kqueue: wait without timeout
Browse files Browse the repository at this point in the history
As noted in #24, reading from the kqueue won't unblock when
it is closed on all platforms. This commit solves the issue by
additionally watching on a pipe which is closed when Watcher
is closed. The read timeout is no longer necessary.
  • Loading branch information
fjl committed Mar 4, 2016
1 parent e3b2f02 commit baf2550
Showing 1 changed file with 41 additions and 40 deletions.
81 changes: 41 additions & 40 deletions kqueue.go
Expand Up @@ -14,16 +14,15 @@ import (
"path/filepath"
"sync"
"syscall"
"time"
)

// Watcher watches a set of files, delivering events to a channel.
type Watcher struct {
Events chan Event
Errors chan error
done chan bool // Channel for sending a "quit message" to the reader goroutine

kq int // File descriptor (as returned by the kqueue() syscall).
kq int // File descriptor (as returned by the kqueue() syscall).
closepipe [2]int // Pipe used for closing.

mu sync.Mutex // Protects access to watcher data
watches map[string]int // Map of watched file descriptors (key: path).
Expand All @@ -41,21 +40,21 @@ type pathInfo struct {

// NewWatcher establishes a new watcher with the underlying OS and begins waiting for events.
func NewWatcher() (*Watcher, error) {
kq, err := kqueue()
kq, closepipe, err := kqueue()
if err != nil {
return nil, err
}

w := &Watcher{
kq: kq,
closepipe: closepipe,
watches: make(map[string]int),
dirFlags: make(map[string]uint32),
paths: make(map[int]pathInfo),
fileExists: make(map[string]bool),
externalWatches: make(map[string]bool),
Events: make(chan Event),
Errors: make(chan error),
done: make(chan bool),
}

go w.readEvents()
Expand Down Expand Up @@ -88,8 +87,8 @@ func (w *Watcher) Close() error {
}
}

// Send "quit" message to the reader goroutine:
w.done <- true
// Send "quit" message to the reader goroutine.
syscall.Close(w.closepipe[1])

return nil
}
Expand Down Expand Up @@ -154,9 +153,6 @@ func (w *Watcher) Remove(name string) error {
// Watch all events (except NOTE_EXTEND, NOTE_LINK, NOTE_REVOKE)
const noteAllEvents = syscall.NOTE_DELETE | syscall.NOTE_WRITE | syscall.NOTE_ATTRIB | syscall.NOTE_RENAME

// keventWaitTime to block on each read from kevent
var keventWaitTime = durationToTimespec(100 * time.Millisecond)

// addWatch adds name to the watched file set.
// The flags are interpreted as described in kevent(2).
// Returns the real path to the file which was added, if any, which may be different from the one passed in the case of symlinks.
Expand Down Expand Up @@ -264,34 +260,35 @@ func (w *Watcher) addWatch(name string, flags uint32) (string, error) {
// Event values that it sends down the Events channel.
func (w *Watcher) readEvents() {
eventBuffer := make([]syscall.Kevent_t, 10)

for {
// See if there is a message on the "done" channel
select {
case <-w.done:
err := syscall.Close(w.kq)
if err != nil {
w.Errors <- err
}
close(w.Events)
close(w.Errors)
return
default:
defer func() {
err := syscall.Close(w.kq)
if err != nil {
w.Errors <- err
}
syscall.Close(w.closepipe[0])
close(w.Events)
close(w.Errors)
}()

// Get new events
kevents, err := read(w.kq, eventBuffer, &keventWaitTime)
for closed := false; !closed; {
kevents, err := read(w.kq, eventBuffer)
// EINTR is okay, the syscall was interrupted before timeout expired.
if err != nil && err != syscall.EINTR {
w.Errors <- err
continue
}

// Flush the events we received to the Events channel
for len(kevents) > 0 {
kevent := &kevents[0]
for _, kevent := range kevents {
watchfd := int(kevent.Ident)
mask := uint32(kevent.Fflags)
// Shut down the loop when the pipe is closed,
// but only after all other events have been processed.
if watchfd == w.closepipe[0] {
closed = true
continue
}

w.mu.Lock()
path := w.paths[watchfd]
w.mu.Unlock()
Expand Down Expand Up @@ -346,9 +343,6 @@ func (w *Watcher) readEvents() {
}
}
}

// Move to next event
kevents = kevents[1:]
}
}
}
Expand Down Expand Up @@ -460,12 +454,24 @@ func (w *Watcher) internalWatch(name string, fileInfo os.FileInfo) (string, erro
}

// kqueue creates a new kernel event queue and returns a descriptor.
func kqueue() (kq int, err error) {
func kqueue() (kq int, closepipe [2]int, err error) {
kq, err = syscall.Kqueue()
if kq == -1 {
return kq, err
return kq, closepipe, err
}
// Register the close pipe.
if err := syscall.Pipe(closepipe[:]); err != nil {
syscall.Close(kq)
return kq, closepipe, err
}
return kq, nil
events := make([]syscall.Kevent_t, 1)
const pflag = syscall.EV_ADD | syscall.EV_ENABLE | syscall.EV_ONESHOT
syscall.SetKevent(&events[0], closepipe[0], syscall.EVFILT_READ, pflag)
if ok, err := syscall.Kevent(kq, events, nil, nil); ok == -1 {
syscall.Close(kq)
return kq, closepipe, err
}
return kq, closepipe, nil
}

// register events with the queue
Expand All @@ -488,15 +494,10 @@ func register(kq int, fds []int, flags int, fflags uint32) error {

// read retrieves pending events, or waits until an event occurs.
// A timeout of nil blocks indefinitely, while 0 polls the queue.
func read(kq int, events []syscall.Kevent_t, timeout *syscall.Timespec) ([]syscall.Kevent_t, error) {
n, err := syscall.Kevent(kq, nil, events, timeout)
func read(kq int, events []syscall.Kevent_t) ([]syscall.Kevent_t, error) {
n, err := syscall.Kevent(kq, nil, events, nil)
if err != nil {
return nil, err
}
return events[0:n], nil
}

// durationToTimespec prepares a timeout value
func durationToTimespec(d time.Duration) syscall.Timespec {
return syscall.NsecToTimespec(d.Nanoseconds())
}

0 comments on commit baf2550

Please sign in to comment.