Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kqueue: remove timeout from unix.Kevent() #480

Merged
merged 5 commits into from Aug 6, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
127 changes: 70 additions & 57 deletions kqueue.go
Expand Up @@ -14,7 +14,6 @@ import (
"os"
"path/filepath"
"sync"
"time"

"golang.org/x/sys/unix"
)
Expand All @@ -23,9 +22,10 @@ import (
type Watcher struct {
Events chan Event
Errors chan error
done chan struct{} // Channel for sending a "quit message" to the reader goroutine
done chan struct{}

kq int // File descriptor (as returned by the kqueue() syscall).
kq int // File descriptor (as returned by the kqueue() syscall).
closepipe [2]int // Pipe used for closing.

mu sync.Mutex // Protects access to watcher data
watches map[string]int // Watched file descriptors (key: path).
Expand All @@ -44,13 +44,14 @@ type pathInfo struct {

// NewWatcher establishes a new watcher with the underlying OS and begins waiting for events.
func NewWatcher() (*Watcher, error) {
kq, err := kqueue()
kq, closepipe, err := kqueue()
if err != nil {
return nil, err
}

w := &Watcher{
kq: kq,
closepipe: closepipe,
watches: make(map[string]int),
watchesByDir: make(map[string]map[int]struct{}),
dirFlags: make(map[string]uint32),
Expand Down Expand Up @@ -87,8 +88,8 @@ func (w *Watcher) Close() error {
w.Remove(name)
}

// send a "quit" message to the reader goroutine
close(w.done)
// Send "quit" message to the reader goroutine.
unix.Close(w.closepipe[1])

return nil
}
Expand All @@ -112,8 +113,8 @@ func (w *Watcher) Remove(name string) error {
return fmt.Errorf("%w: %s", ErrNonExistentWatch, name)
}

const registerRemove = unix.EV_DELETE
if err := register(w.kq, []int{watchfd}, registerRemove, 0); err != nil {
err := register(w.kq, []int{watchfd}, unix.EV_DELETE, 0)
if err != nil {
return err
}

Expand Down Expand Up @@ -172,9 +173,6 @@ func (w *Watcher) WatchList() []string {
// Watch all events (except NOTE_EXTEND, NOTE_LINK, NOTE_REVOKE)
const noteAllEvents = unix.NOTE_DELETE | unix.NOTE_WRITE | unix.NOTE_ATTRIB | unix.NOTE_RENAME

// keventWaitTime to block on each read from kevent
var keventWaitTime = durationToTimespec(100 * time.Millisecond)

// addWatch adds name to the watched file set.
// The flags are interpreted as described in kevent(2).
// Returns the real path to the file which was added, if any, which may be different from the one passed in the case of symlinks.
Expand Down Expand Up @@ -255,8 +253,8 @@ func (w *Watcher) addWatch(name string, flags uint32) (string, error) {
isDir = fi.IsDir()
}

const registerAdd = unix.EV_ADD | unix.EV_CLEAR | unix.EV_ENABLE
if err := register(w.kq, []int{watchfd}, registerAdd, flags); err != nil {
err := register(w.kq, []int{watchfd}, unix.EV_ADD|unix.EV_CLEAR|unix.EV_ENABLE, flags)
if err != nil {
unix.Close(watchfd)
return "", err
}
Expand Down Expand Up @@ -301,35 +299,44 @@ func (w *Watcher) addWatch(name string, flags uint32) (string, error) {
// Event values that it sends down the Events channel.
func (w *Watcher) readEvents() {
eventBuffer := make([]unix.Kevent_t, 10)

loop:
for {
// See if there is a message on the "done" channel
select {
case <-w.done:
break loop
default:
defer func() {
err := unix.Close(w.kq)
if err != nil {
w.Errors <- err
}

// Get new events
kevents, err := read(w.kq, eventBuffer, &keventWaitTime)
unix.Close(w.closepipe[0])
close(w.done)
close(w.Events)
close(w.Errors)
}()

for closed := false; !closed; {
kevents, err := read(w.kq, eventBuffer)
// EINTR is okay, the syscall was interrupted before timeout expired.
if err != nil && err != unix.EINTR {
select {
case w.Errors <- err:
case <-w.done:
break loop
closed = true
continue
}
continue
}

// Flush the events we received to the Events channel
for len(kevents) > 0 {
for _, kevent := range kevents {
var (
kevent = &kevents[0]
watchfd = int(kevent.Ident)
mask = uint32(kevent.Fflags)
)

// Shut down the loop when the pipe is closed, but only after all
// other events have been processed.
if watchfd == w.closepipe[0] {
closed = true
continue
}

w.mu.Lock()
path := w.paths[watchfd]
w.mu.Unlock()
Expand Down Expand Up @@ -360,7 +367,8 @@ loop:
select {
case w.Events <- event:
case <-w.done:
break loop
closed = true
continue
}
}

Expand Down Expand Up @@ -388,23 +396,8 @@ loop:
}
}
}

// Move to next event
kevents = kevents[1:]
}
}

// cleanup
err := unix.Close(w.kq)
if err != nil {
// only way the previous loop breaks is if w.done was closed so we need to async send to w.Errors.
select {
case w.Errors <- err:
default:
}
}
close(w.Events)
close(w.Errors)
}

// newEvent returns an platform-independent Event based on kqueue Fflags.
Expand Down Expand Up @@ -517,25 +510,50 @@ func (w *Watcher) internalWatch(name string, fileInfo os.FileInfo) (string, erro
}

// kqueue creates a new kernel event queue and returns a descriptor.
func kqueue() (kq int, err error) {
//
// This registers a new event on closepipe, which will trigger an event when
// it's closed. This way we can use kevent() without timeout/polling; without
// the closepipe, it would block forever and we wouldn't be able to stop it at
// all.
func kqueue() (kq int, closepipe [2]int, err error) {
kq, err = unix.Kqueue()
if kq == -1 {
return kq, err
return kq, closepipe, err
}

// Register the close pipe.
err = unix.Pipe(closepipe[:])
if err != nil {
unix.Close(kq)
return kq, closepipe, err
}

// Register changes to listen on the closepipe.
changes := make([]unix.Kevent_t, 1)
// SetKevent converts int to the platform-specific types.
unix.SetKevent(&changes[0], closepipe[0], unix.EVFILT_READ,
unix.EV_ADD|unix.EV_ENABLE|unix.EV_ONESHOT)

ok, err := unix.Kevent(kq, changes, nil, nil)
if ok == -1 {
unix.Close(kq)
unix.Close(closepipe[0])
unix.Close(closepipe[1])
return kq, closepipe, err
}
return kq, nil
return kq, closepipe, nil
}

// register events with the queue
// Register events with the queue.
func register(kq int, fds []int, flags int, fflags uint32) error {
changes := make([]unix.Kevent_t, len(fds))

for i, fd := range fds {
// SetKevent converts int to the platform-specific types:
// SetKevent converts int to the platform-specific types.
unix.SetKevent(&changes[i], fd, unix.EVFILT_VNODE, flags)
changes[i].Fflags = fflags
}

// register the events
// Register the events.
success, err := unix.Kevent(kq, changes, nil, nil)
if success == -1 {
return err
Expand All @@ -545,15 +563,10 @@ func register(kq int, fds []int, flags int, fflags uint32) error {

// read retrieves pending events, or waits until an event occurs.
// A timeout of nil blocks indefinitely, while 0 polls the queue.
func read(kq int, events []unix.Kevent_t, timeout *unix.Timespec) ([]unix.Kevent_t, error) {
n, err := unix.Kevent(kq, nil, events, timeout)
func read(kq int, events []unix.Kevent_t) ([]unix.Kevent_t, error) {
n, err := unix.Kevent(kq, nil, events, nil)
if err != nil {
return nil, err
}
return events[0:n], nil
}

// durationToTimespec prepares a timeout value
func durationToTimespec(d time.Duration) unix.Timespec {
return unix.NsecToTimespec(d.Nanoseconds())
}