Skip to content

Commit

Permalink
perf: flush Record when deadline is exceeded
Browse files Browse the repository at this point in the history
Watermark and WakeupEvents allow reducing the rate at which user
space is woken up. This creates an edge case where Read() returns
os.ErrDeadlineExceeded even though there is data in one of the rings.

This is also a problem in the ringbuf package, where we've solved this
by checking whether there is any data when encountering
an ErrDeadlineExceeded. Adopt the same approach for the perf reader.

TestPerfReaderWakeupEvents has to change since it relies on the old
behaviour of not checking for pending data. Rework the test so that
it doesn't test the ErrDeadlineExceeded error: it's not so important
that we don't wake up early.

Signed-off-by: Lorenz Bauer <lmb@isovalent.com>
  • Loading branch information
lmb committed May 10, 2024
1 parent 75bce38 commit 5650724
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 23 deletions.
22 changes: 19 additions & 3 deletions perf/reader.go
Expand Up @@ -327,7 +327,11 @@ func (pr *Reader) SetDeadline(t time.Time) {
//
// Calling Close interrupts the function.
//
// Returns os.ErrDeadlineExceeded if a deadline was set.
// Returns [os.ErrDeadlineExceeded] if a deadline was set and the perf ring buffer
// was empty. Otherwise returns a record and no error, even if the deadline was
// exceeded.
//
// See [Reader.ReadInto] for a more efficient version of this method.
func (pr *Reader) Read() (Record, error) {
var r Record

Expand All @@ -336,7 +340,7 @@ func (pr *Reader) Read() (Record, error) {

var errMustBePaused = fmt.Errorf("perf ringbuffer: must have been paused before reading overwritable buffer")

// ReadInto is like Read except that it allows reusing Record and associated buffers.
// ReadInto is like [Reader.Read] except that it allows reusing Record and associated buffers.
func (pr *Reader) ReadInto(rec *Record) error {
pr.mu.Lock()
defer pr.mu.Unlock()
Expand All @@ -352,14 +356,26 @@ func (pr *Reader) ReadInto(rec *Record) error {
return fmt.Errorf("perf ringbuffer: %w", ErrClosed)
}

deadlineWasExceeded := false
for {
if len(pr.epollRings) == 0 {
if deadlineWasExceeded {
// All rings were empty when the deadline expired, return
// appropriate error.
return os.ErrDeadlineExceeded
}

// NB: The deferred pauseMu.Unlock will panic if Wait panics, which
// might obscure the original panic.
pr.pauseMu.Unlock()
_, err := pr.poller.Wait(pr.epollEvents, pr.deadline)
pr.pauseMu.Lock()
if err != nil {

if errors.Is(err, os.ErrDeadlineExceeded) {
// We've hit the deadline, check whether there is any data in
// the rings that we've not been woken up for.
deadlineWasExceeded = true
} else if err != nil {
return err
}

Expand Down
50 changes: 30 additions & 20 deletions perf/reader_test.go
Expand Up @@ -500,36 +500,46 @@ func TestPerfReaderWakeupEvents(t *testing.T) {
}
defer rd.Close()

// Write a sample. The reader should read it.
prog := outputSamplesProg(t, events, 5)
ret, _, err := prog.Test(internal.EmptyBPFContext)
testutils.SkipIfNotSupported(t, err)
if err != nil || ret != 0 {
t.Fatal("Can't write sample")
}

if errno := syscall.Errno(-int32(ret)); errno != 0 {
t.Fatal("Expected 0 as return value, got", errno)
}

rd.SetDeadline(time.Now().Add(10 * time.Millisecond))
_, err = rd.Read()
qt.Assert(t, qt.ErrorIs(err, os.ErrDeadlineExceeded), qt.Commentf("expected os.ErrDeadlineExceeded"))

// send followup events
for i := 1; i < numEvents; i++ {
// Send enough events to trigger WakeupEvents.
for i := 0; i < numEvents; i++ {
_, _, err = prog.Test(internal.EmptyBPFContext)
if err != nil {
t.Fatal(err)
}
testutils.SkipIfNotSupported(t, err)
qt.Assert(t, qt.IsNil(err))
}

rd.SetDeadline(time.Time{})
time.AfterFunc(5*time.Second, func() {
// Interrupt Read() in case the implementation is buggy.
rd.Close()
})

for i := 0; i < numEvents; i++ {
checkRecord(t, rd)
}
}

func TestReadWithoutWakeup(t *testing.T) {
t.Parallel()

events := perfEventArray(t)

rd, err := NewReaderWithOptions(events, 1, ReaderOptions{WakeupEvents: 2})
if err != nil {
t.Fatal(err)
}
defer rd.Close()

prog := outputSamplesProg(t, events, 5)
ret, _, err := prog.Test(internal.EmptyBPFContext)
testutils.SkipIfNotSupported(t, err)
qt.Assert(t, qt.IsNil(err))
qt.Assert(t, qt.Equals(ret, 0))

rd.SetDeadline(time.Now())
checkRecord(t, rd)
}

func BenchmarkReader(b *testing.B) {
events := perfEventArray(b)
prog := outputSamplesProg(b, events, 80)
Expand Down

0 comments on commit 5650724

Please sign in to comment.