From 066bc383f5c94d930fe5e941a9990663525f3a21 Mon Sep 17 00:00:00 2001 From: horahoradev Date: Sun, 27 Feb 2022 15:00:24 -0800 Subject: [PATCH] Replaced use of raw epoll with netpoller read --- .github/workflows/vagrant_test.yml | 56 +++++++ Vagrantfiles/debian6/Vagrantfile | 7 + inotify.go | 98 ++++-------- inotify_poller.go | 187 ----------------------- inotify_poller_test.go | 234 ----------------------------- inotify_test.go | 39 +++++ 6 files changed, 135 insertions(+), 486 deletions(-) create mode 100644 .github/workflows/vagrant_test.yml create mode 100644 Vagrantfiles/debian6/Vagrantfile delete mode 100644 inotify_poller.go delete mode 100644 inotify_poller_test.go diff --git a/.github/workflows/vagrant_test.yml b/.github/workflows/vagrant_test.yml new file mode 100644 index 00000000..54b4ecef --- /dev/null +++ b/.github/workflows/vagrant_test.yml @@ -0,0 +1,56 @@ +# MIT License + +# Copyright (c) 2021 Jonas Hecht + +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: + +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. + +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. + +# https://github.com/jonashackt/vagrant-github-actions/ + +name: vagrant_test +on: + push: + pull_request: +jobs: + test: + strategy: + fail-fast: false + matrix: + vagrant_image: + - debian6 + runs-on: macos-10.15 + steps: + - uses: actions/checkout@v2 + + - name: setup Go + uses: actions/setup-go@v2 + with: + go-version: '1.17' + + - name: Show Vagrant version + run: vagrant --version + + - name: compile test binary + run: | + GOOS=linux GOARCH=amd64 go test -o Vagrantfiles/${{ matrix.vagrant_image}}/fsnotify.test -c ./... + + - name: Run vagrant up + run: cd Vagrantfiles/${{ matrix.vagrant_image}} && vagrant up + + - name: run the test binary + run: cd Vagrantfiles/${{ matrix.vagrant_image}} && vagrant ssh -c "/vagrant/fsnotify.test" \ No newline at end of file diff --git a/Vagrantfiles/debian6/Vagrantfile b/Vagrantfiles/debian6/Vagrantfile new file mode 100644 index 00000000..e2888b89 --- /dev/null +++ b/Vagrantfiles/debian6/Vagrantfile @@ -0,0 +1,7 @@ +Vagrant.configure("2") do |config| + config.vm.box = "threatstack/debian6" + config.vm.box_version = "1.0.0" + + config.vm.define 'debian6' + +end \ No newline at end of file diff --git a/inotify.go b/inotify.go index b01124a6..02570f11 100644 --- a/inotify.go +++ b/inotify.go @@ -22,39 +22,36 @@ import ( // Watcher watches a set of files, delivering events to a channel. type Watcher struct { - Events chan Event - Errors chan error - mu sync.Mutex // Map access - fd int - poller *fdPoller - watches map[string]*watch // Map of inotify watches (key: path) - paths map[int]string // Map of watched paths (key: watch descriptor) - done chan struct{} // Channel for sending a "quit message" to the reader goroutine - doneResp chan struct{} // Channel to respond to Close + fd int // https://github.com/golang/go/issues/26439 can't call .Fd() on os.FIle or Read will no longer return on Close() + Events chan Event + Errors chan error + mu sync.Mutex // Map access + inotifyFile *os.File + watches map[string]*watch // Map of inotify watches (key: path) + paths map[int]string // Map of watched paths (key: watch descriptor) + done chan struct{} // Channel for sending a "quit message" to the reader goroutine + doneResp chan struct{} // Channel to respond to Close } // NewWatcher establishes a new watcher with the underlying OS and begins waiting for events. func NewWatcher() (*Watcher, error) { // Create inotify fd - fd, errno := unix.InotifyInit1(unix.IN_CLOEXEC) + // Need to set the FD to nonblocking mode in order for SetDeadline methods to work + // Otherwise, blocking i/o operations won't terminate on close + fd, errno := unix.InotifyInit1(unix.IN_CLOEXEC | unix.IN_NONBLOCK) if fd == -1 { return nil, errno } - // Create epoll - poller, err := newFdPoller(fd) - if err != nil { - unix.Close(fd) - return nil, err - } + w := &Watcher{ - fd: fd, - poller: poller, - watches: make(map[string]*watch), - paths: make(map[int]string), - Events: make(chan Event), - Errors: make(chan error), - done: make(chan struct{}), - doneResp: make(chan struct{}), + fd: fd, + inotifyFile: os.NewFile(uintptr(fd), ""), + watches: make(map[string]*watch), + paths: make(map[int]string), + Events: make(chan Event), + Errors: make(chan error), + done: make(chan struct{}), + doneResp: make(chan struct{}), } go w.readEvents() @@ -82,8 +79,11 @@ func (w *Watcher) Close() error { close(w.done) w.mu.Unlock() - // Wake up goroutine - w.poller.wake() + // Causes any blocking reads to return with an error, provided the file still supports deadline operations + err := w.inotifyFile.Close() + if err != nil { + return err + } // Wait for goroutine to close <-w.doneResp @@ -189,16 +189,12 @@ type watch struct { func (w *Watcher) readEvents() { var ( buf [unix.SizeofInotifyEvent * 4096]byte // Buffer for a maximum of 4096 raw events - n int // Number of bytes read with read() errno error // Syscall errno - ok bool // For poller.wait ) defer close(w.doneResp) defer close(w.Errors) defer close(w.Events) - defer unix.Close(w.fd) - defer w.poller.close() for { // See if we have been closed. @@ -206,33 +202,19 @@ func (w *Watcher) readEvents() { return } - ok, errno = w.poller.wait() - if errno != nil { + n, err := w.inotifyFile.Read(buf[:]) + switch { + case errors.Unwrap(err) == os.ErrClosed: + return + case err != nil: select { - case w.Errors <- errno: + case w.Errors <- err: case <-w.done: return } continue } - if !ok { - continue - } - - n, errno = unix.Read(w.fd, buf[:]) - // If a signal interrupted execution, see if we've been asked to close, and try again. - // http://man7.org/linux/man-pages/man7/signal.7.html : - // "Before Linux 3.8, reads from an inotify(7) file descriptor were not restartable" - if errno == unix.EINTR { - continue - } - - // unix.Read might have been woken up by Close. If so, we're done. - if w.isClosed() { - return - } - if n < unix.SizeofInotifyEvent { var err error if n == 0 { @@ -315,21 +297,7 @@ func (w *Watcher) readEvents() { // channel. Such as events marked ignore by the kernel, or MODIFY events // against files that do not exist. func (e *Event) ignoreLinux(mask uint32) bool { - // Ignore anything the inotify API says to ignore - if mask&unix.IN_IGNORED == unix.IN_IGNORED { - return true - } - - // If the event is Create or Write, the file must exist, or the - // event will be suppressed. - // *Note*: this was put in place because it was seen that a Write - // event was sent after the Remove. This ignores the Write and - // assumes a Remove will come or has come if the file doesn't exist. - if e.Op&Create == Create || e.Op&Write == Write { - _, statErr := os.Lstat(e.Name) - return os.IsNotExist(statErr) - } - return false + return mask&unix.IN_IGNORED == unix.IN_IGNORED } // newEvent returns an platform-independent Event based on an inotify mask. diff --git a/inotify_poller.go b/inotify_poller.go deleted file mode 100644 index b572a37c..00000000 --- a/inotify_poller.go +++ /dev/null @@ -1,187 +0,0 @@ -// Copyright 2015 The Go Authors. All rights reserved. -// Use of this source code is governed by a BSD-style -// license that can be found in the LICENSE file. - -//go:build linux -// +build linux - -package fsnotify - -import ( - "errors" - - "golang.org/x/sys/unix" -) - -type fdPoller struct { - fd int // File descriptor (as returned by the inotify_init() syscall) - epfd int // Epoll file descriptor - pipe [2]int // Pipe for waking up -} - -func emptyPoller(fd int) *fdPoller { - poller := new(fdPoller) - poller.fd = fd - poller.epfd = -1 - poller.pipe[0] = -1 - poller.pipe[1] = -1 - return poller -} - -// Create a new inotify poller. -// This creates an inotify handler, and an epoll handler. -func newFdPoller(fd int) (*fdPoller, error) { - var errno error - poller := emptyPoller(fd) - defer func() { - if errno != nil { - poller.close() - } - }() - - // Create epoll fd - poller.epfd, errno = unix.EpollCreate1(unix.EPOLL_CLOEXEC) - if poller.epfd == -1 { - return nil, errno - } - // Create pipe; pipe[0] is the read end, pipe[1] the write end. - errno = unix.Pipe2(poller.pipe[:], unix.O_NONBLOCK|unix.O_CLOEXEC) - if errno != nil { - return nil, errno - } - - // Register inotify fd with epoll - event := unix.EpollEvent{ - Fd: int32(poller.fd), - Events: unix.EPOLLIN, - } - errno = unix.EpollCtl(poller.epfd, unix.EPOLL_CTL_ADD, poller.fd, &event) - if errno != nil { - return nil, errno - } - - // Register pipe fd with epoll - event = unix.EpollEvent{ - Fd: int32(poller.pipe[0]), - Events: unix.EPOLLIN, - } - errno = unix.EpollCtl(poller.epfd, unix.EPOLL_CTL_ADD, poller.pipe[0], &event) - if errno != nil { - return nil, errno - } - - return poller, nil -} - -// Wait using epoll. -// Returns true if something is ready to be read, -// false if there is not. -func (poller *fdPoller) wait() (bool, error) { - // 3 possible events per fd, and 2 fds, makes a maximum of 6 events. - // I don't know whether epoll_wait returns the number of events returned, - // or the total number of events ready. - // I decided to catch both by making the buffer one larger than the maximum. - events := make([]unix.EpollEvent, 7) - for { - n, errno := unix.EpollWait(poller.epfd, events, -1) - if n == -1 { - if errno == unix.EINTR { - continue - } - return false, errno - } - if n == 0 { - // If there are no events, try again. - continue - } - if n > 6 { - // This should never happen. More events were returned than should be possible. - return false, errors.New("epoll_wait returned more events than I know what to do with") - } - ready := events[:n] - epollhup := false - epollerr := false - epollin := false - for _, event := range ready { - if event.Fd == int32(poller.fd) { - if event.Events&unix.EPOLLHUP != 0 { - // This should not happen, but if it does, treat it as a wakeup. - epollhup = true - } - if event.Events&unix.EPOLLERR != 0 { - // If an error is waiting on the file descriptor, we should pretend - // something is ready to read, and let unix.Read pick up the error. - epollerr = true - } - if event.Events&unix.EPOLLIN != 0 { - // There is data to read. - epollin = true - } - } - if event.Fd == int32(poller.pipe[0]) { - if event.Events&unix.EPOLLHUP != 0 { - // Write pipe descriptor was closed, by us. This means we're closing down the - // watcher, and we should wake up. - } - if event.Events&unix.EPOLLERR != 0 { - // If an error is waiting on the pipe file descriptor. - // This is an absolute mystery, and should never ever happen. - return false, errors.New("Error on the pipe descriptor.") - } - if event.Events&unix.EPOLLIN != 0 { - // This is a regular wakeup, so we have to clear the buffer. - err := poller.clearWake() - if err != nil { - return false, err - } - } - } - } - - if epollhup || epollerr || epollin { - return true, nil - } - return false, nil - } -} - -// Close the write end of the poller. -func (poller *fdPoller) wake() error { - buf := make([]byte, 1) - n, errno := unix.Write(poller.pipe[1], buf) - if n == -1 { - if errno == unix.EAGAIN { - // Buffer is full, poller will wake. - return nil - } - return errno - } - return nil -} - -func (poller *fdPoller) clearWake() error { - // You have to be woken up a LOT in order to get to 100! - buf := make([]byte, 100) - n, errno := unix.Read(poller.pipe[0], buf) - if n == -1 { - if errno == unix.EAGAIN { - // Buffer is empty, someone else cleared our wake. - return nil - } - return errno - } - return nil -} - -// Close all poller file descriptors, but not the one passed to it. -func (poller *fdPoller) close() { - if poller.pipe[1] != -1 { - unix.Close(poller.pipe[1]) - } - if poller.pipe[0] != -1 { - unix.Close(poller.pipe[0]) - } - if poller.epfd != -1 { - unix.Close(poller.epfd) - } -} diff --git a/inotify_poller_test.go b/inotify_poller_test.go deleted file mode 100644 index 110e00db..00000000 --- a/inotify_poller_test.go +++ /dev/null @@ -1,234 +0,0 @@ -// Copyright 2015 The Go Authors. All rights reserved. -// Use of this source code is governed by a BSD-style -// license that can be found in the LICENSE file. - -//go:build linux -// +build linux - -package fsnotify - -import ( - "testing" - "time" - - "golang.org/x/sys/unix" -) - -type testFd [2]int - -func makeTestFd(t *testing.T) testFd { - var tfd testFd - errno := unix.Pipe(tfd[:]) - if errno != nil { - t.Fatalf("Failed to create pipe: %v", errno) - } - return tfd -} - -func (tfd testFd) fd() int { - return tfd[0] -} - -func (tfd testFd) closeWrite(t *testing.T) { - errno := unix.Close(tfd[1]) - if errno != nil { - t.Fatalf("Failed to close write end of pipe: %v", errno) - } -} - -func (tfd testFd) put(t *testing.T) { - buf := make([]byte, 10) - _, errno := unix.Write(tfd[1], buf) - if errno != nil { - t.Fatalf("Failed to write to pipe: %v", errno) - } -} - -func (tfd testFd) get(t *testing.T) { - buf := make([]byte, 10) - _, errno := unix.Read(tfd[0], buf) - if errno != nil { - t.Fatalf("Failed to read from pipe: %v", errno) - } -} - -func (tfd testFd) close() { - unix.Close(tfd[1]) - unix.Close(tfd[0]) -} - -func makePoller(t *testing.T) (testFd, *fdPoller) { - tfd := makeTestFd(t) - poller, err := newFdPoller(tfd.fd()) - if err != nil { - t.Fatalf("Failed to create poller: %v", err) - } - return tfd, poller -} - -func TestPollerWithBadFd(t *testing.T) { - _, err := newFdPoller(-1) - if err != unix.EBADF { - t.Fatalf("Expected EBADF, got: %v", err) - } -} - -func TestPollerWithData(t *testing.T) { - tfd, poller := makePoller(t) - defer tfd.close() - defer poller.close() - - tfd.put(t) - ok, err := poller.wait() - if err != nil { - t.Fatalf("poller failed: %v", err) - } - if !ok { - t.Fatalf("expected poller to return true") - } - tfd.get(t) -} - -func TestPollerWithWakeup(t *testing.T) { - tfd, poller := makePoller(t) - defer tfd.close() - defer poller.close() - - err := poller.wake() - if err != nil { - t.Fatalf("wake failed: %v", err) - } - ok, err := poller.wait() - if err != nil { - t.Fatalf("poller failed: %v", err) - } - if ok { - t.Fatalf("expected poller to return false") - } -} - -func TestPollerWithClose(t *testing.T) { - tfd, poller := makePoller(t) - defer tfd.close() - defer poller.close() - - tfd.closeWrite(t) - ok, err := poller.wait() - if err != nil { - t.Fatalf("poller failed: %v", err) - } - if !ok { - t.Fatalf("expected poller to return true") - } -} - -func TestPollerWithWakeupAndData(t *testing.T) { - tfd, poller := makePoller(t) - defer tfd.close() - defer poller.close() - - tfd.put(t) - err := poller.wake() - if err != nil { - t.Fatalf("wake failed: %v", err) - } - - // both data and wakeup - ok, err := poller.wait() - if err != nil { - t.Fatalf("poller failed: %v", err) - } - if !ok { - t.Fatalf("expected poller to return true") - } - - // data is still in the buffer, wakeup is cleared - ok, err = poller.wait() - if err != nil { - t.Fatalf("poller failed: %v", err) - } - if !ok { - t.Fatalf("expected poller to return true") - } - - tfd.get(t) - // data is gone, only wakeup now - err = poller.wake() - if err != nil { - t.Fatalf("wake failed: %v", err) - } - ok, err = poller.wait() - if err != nil { - t.Fatalf("poller failed: %v", err) - } - if ok { - t.Fatalf("expected poller to return false") - } -} - -func TestPollerConcurrent(t *testing.T) { - tfd, poller := makePoller(t) - defer tfd.close() - defer poller.close() - - oks := make(chan bool) - live := make(chan bool) - defer close(live) - go func() { - defer close(oks) - for { - ok, err := poller.wait() - if err != nil { - t.Errorf("poller failed: %v", err) - } - oks <- ok - if !<-live { - return - } - } - }() - - // Try a write - select { - case <-time.After(50 * time.Millisecond): - case <-oks: - t.Fatalf("poller did not wait") - } - tfd.put(t) - if !<-oks { - t.Fatalf("expected true") - } - tfd.get(t) - live <- true - - // Try a wakeup - select { - case <-time.After(50 * time.Millisecond): - case <-oks: - t.Fatalf("poller did not wait") - } - err := poller.wake() - if err != nil { - t.Fatalf("wake failed: %v", err) - } - if <-oks { - t.Fatalf("expected false") - } - live <- true - - // Try a close - select { - case <-time.After(50 * time.Millisecond): - case <-oks: - t.Fatalf("poller did not wait") - } - tfd.closeWrite(t) - if !<-oks { - t.Fatalf("expected true") - } - tfd.get(t) - - // wait for all goroutines for finish. - live <- false - <-oks -} diff --git a/inotify_test.go b/inotify_test.go index 269c3ff8..fb5c717d 100644 --- a/inotify_test.go +++ b/inotify_test.go @@ -11,7 +11,9 @@ import ( "errors" "fmt" "os" + "os/exec" "path/filepath" + "strconv" "strings" "sync" "testing" @@ -550,3 +552,40 @@ func TestInotifyDeleteOpenedFile(t *testing.T) { fd.Close() checkEvent(Remove) } + +func TestINotifyNoBlockingSyscalls(t *testing.T) { + getThreads := func() int { + cmd := fmt.Sprintf("ls /proc/%d/task | wc -l", os.Getpid()) + output, err := exec.Command("/bin/bash", "-c", cmd).Output() + if err != nil { + t.Fatalf("Failed to execute command to check number of threads, err %s", err) + } + + n, err := strconv.ParseInt(strings.Trim(string(output), "\n"), 10, 64) + if err != nil { + t.Fatalf("Failed to parse output as int, err: %s", err) + } + return int(n) + } + + w, err := NewWatcher() + if err != nil { + t.Fatalf("Failed to create watcher: %v", err) + } + + startingThreads := getThreads() + // Call readEvents a bunch of times; if this function has a blocking raw syscall, it'll create many new kthreads + for i := 0; i <= 60; i++ { + go w.readEvents() + } + + // Bad synchronization mechanism + time.Sleep(time.Second * 2) + + endingThreads := getThreads() + + // Did we spawn any new threads? + if diff := endingThreads - startingThreads; diff > 0 { + t.Fatalf("Got a nonzero diff %v. starting: %v. ending: %v", diff, startingThreads, endingThreads) + } +}