Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kqueue: wait without timeout #124

Closed
wants to merge 1 commit into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
83 changes: 43 additions & 40 deletions kqueue.go
Expand Up @@ -13,7 +13,6 @@ import (
"os"
"path/filepath"
"sync"
"time"

"golang.org/x/sys/unix"
)
Expand All @@ -22,9 +21,9 @@ import (
type Watcher struct {
Events chan Event
Errors chan error
done chan bool // Channel for sending a "quit message" to the reader goroutine

kq int // File descriptor (as returned by the kqueue() syscall).
kq int // File descriptor (as returned by the kqueue() syscall).
closepipe [2]int // Pipe used for closing.

mu sync.Mutex // Protects access to watcher data
watches map[string]int // Map of watched file descriptors (key: path).
Expand All @@ -42,21 +41,21 @@ type pathInfo struct {

// NewWatcher establishes a new watcher with the underlying OS and begins waiting for events.
func NewWatcher() (*Watcher, error) {
kq, err := kqueue()
kq, closepipe, err := kqueue()
if err != nil {
return nil, err
}

w := &Watcher{
kq: kq,
closepipe: closepipe,
watches: make(map[string]int),
dirFlags: make(map[string]uint32),
paths: make(map[int]pathInfo),
fileExists: make(map[string]bool),
externalWatches: make(map[string]bool),
Events: make(chan Event),
Errors: make(chan error),
done: make(chan bool),
}

go w.readEvents()
Expand Down Expand Up @@ -89,8 +88,8 @@ func (w *Watcher) Close() error {
}
}

// Send "quit" message to the reader goroutine:
w.done <- true
// Send "quit" message to the reader goroutine.
unix.Close(w.closepipe[1])

return nil
}
Expand Down Expand Up @@ -155,9 +154,6 @@ func (w *Watcher) Remove(name string) error {
// Watch all events (except NOTE_EXTEND, NOTE_LINK, NOTE_REVOKE)
const noteAllEvents = unix.NOTE_DELETE | unix.NOTE_WRITE | unix.NOTE_ATTRIB | unix.NOTE_RENAME

// keventWaitTime to block on each read from kevent
var keventWaitTime = durationToTimespec(100 * time.Millisecond)

// addWatch adds name to the watched file set.
// The flags are interpreted as described in kevent(2).
// Returns the real path to the file which was added, if any, which may be different from the one passed in the case of symlinks.
Expand Down Expand Up @@ -265,34 +261,35 @@ func (w *Watcher) addWatch(name string, flags uint32) (string, error) {
// Event values that it sends down the Events channel.
func (w *Watcher) readEvents() {
eventBuffer := make([]unix.Kevent_t, 10)

for {
// See if there is a message on the "done" channel
select {
case <-w.done:
err := unix.Close(w.kq)
if err != nil {
w.Errors <- err
}
close(w.Events)
close(w.Errors)
return
default:
defer func() {
err := unix.Close(w.kq)
if err != nil {
w.Errors <- err
}
unix.Close(w.closepipe[0])
close(w.Events)
close(w.Errors)
}()

// Get new events
kevents, err := read(w.kq, eventBuffer, &keventWaitTime)
for closed := false; !closed; {
kevents, err := read(w.kq, eventBuffer)
// EINTR is okay, the syscall was interrupted before timeout expired.
if err != nil && err != unix.EINTR {
w.Errors <- err
continue
}

// Flush the events we received to the Events channel
for len(kevents) > 0 {
kevent := &kevents[0]
for _, kevent := range kevents {
watchfd := int(kevent.Ident)
mask := uint32(kevent.Fflags)
// Shut down the loop when the pipe is closed,
// but only after all other events have been processed.
if watchfd == w.closepipe[0] {
closed = true
continue
}

w.mu.Lock()
path := w.paths[watchfd]
w.mu.Unlock()
Expand Down Expand Up @@ -347,9 +344,6 @@ func (w *Watcher) readEvents() {
}
}
}

// Move to next event
kevents = kevents[1:]
}
}
}
Expand Down Expand Up @@ -461,12 +455,26 @@ func (w *Watcher) internalWatch(name string, fileInfo os.FileInfo) (string, erro
}

// kqueue creates a new kernel event queue and returns a descriptor.
func kqueue() (kq int, err error) {
func kqueue() (kq int, closepipe [2]int, err error) {
kq, err = unix.Kqueue()
if kq == -1 {
return kq, err
return kq, closepipe, err
}
// Register the close pipe.
if err := unix.Pipe(closepipe[:]); err != nil {
unix.Close(kq)
return kq, closepipe, err
}
return kq, nil
events := make([]unix.Kevent_t, 1)
const pflag = unix.EV_ADD | unix.EV_ENABLE | unix.EV_ONESHOT
unix.SetKevent(&events[0], closepipe[0], unix.EVFILT_READ, pflag)
if ok, err := unix.Kevent(kq, events, nil, nil); ok == -1 {
unix.Close(kq)
unix.Close(closepipe[0])
unix.Close(closepipe[1])
return kq, closepipe, err
}
return kq, closepipe, nil
}

// register events with the queue
Expand All @@ -489,15 +497,10 @@ func register(kq int, fds []int, flags int, fflags uint32) error {

// read retrieves pending events, or waits until an event occurs.
// A timeout of nil blocks indefinitely, while 0 polls the queue.
func read(kq int, events []unix.Kevent_t, timeout *unix.Timespec) ([]unix.Kevent_t, error) {
n, err := unix.Kevent(kq, nil, events, timeout)
func read(kq int, events []unix.Kevent_t) ([]unix.Kevent_t, error) {
n, err := unix.Kevent(kq, nil, events, nil)
if err != nil {
return nil, err
}
return events[0:n], nil
}

// durationToTimespec prepares a timeout value
func durationToTimespec(d time.Duration) unix.Timespec {
return unix.NsecToTimespec(d.Nanoseconds())
}