Skip to content

Commit

Permalink
add WakeupEvents support to perf Reader
Browse files Browse the repository at this point in the history
Signed-off-by: Bryce Kahle <bryce.kahle@datadoghq.com>
  • Loading branch information
brycekahle authored and lmb committed Apr 3, 2024
1 parent 5a7f946 commit 9c1d099
Show file tree
Hide file tree
Showing 4 changed files with 72 additions and 18 deletions.
9 changes: 8 additions & 1 deletion perf/reader.go
Expand Up @@ -169,6 +169,10 @@ type Reader struct {
// ReaderOptions control the behaviour of the user
// space reader.
type ReaderOptions struct {
// The number of events required in any per CPU buffer before
// Read will process data. This is mutually exclusive with Watermark.
// The default is zero, which means Watermark will take precedence.
WakeupEvents int
// The number of written bytes required in any per CPU buffer before
// Read will process data. Must be smaller than PerCPUBuffer.
// The default is to start processing as soon as data is available.
Expand All @@ -192,6 +196,9 @@ func NewReaderWithOptions(array *ebpf.Map, perCPUBuffer int, opts ReaderOptions)
if perCPUBuffer < 1 {
return nil, errors.New("perCPUBuffer must be larger than 0")
}
if opts.WakeupEvents > 0 && opts.Watermark > 0 {
return nil, errors.New("WakeupEvents and Watermark cannot both be non-zero")
}

var (
fds []int
Expand Down Expand Up @@ -224,7 +231,7 @@ func NewReaderWithOptions(array *ebpf.Map, perCPUBuffer int, opts ReaderOptions)
// Hence we have to create a ring for each CPU.
bufferSize := 0
for i := 0; i < nCPU; i++ {
ring, err := newPerfEventRing(i, perCPUBuffer, opts.Watermark, opts.Overwritable)
ring, err := newPerfEventRing(i, perCPUBuffer, opts)
if errors.Is(err, unix.ENODEV) {
// The requested CPU is currently offline, skip it.
rings = append(rings, nil)
Expand Down
42 changes: 41 additions & 1 deletion perf/reader_test.go
Expand Up @@ -382,7 +382,7 @@ func TestPerfReaderClose(t *testing.T) {
}

func TestCreatePerfEvent(t *testing.T) {
fd, err := createPerfEvent(0, 1, false)
fd, err := createPerfEvent(0, ReaderOptions{Watermark: 1, Overwritable: false})
if err != nil {
t.Fatal("Can't create perf event:", err)
}
Expand Down Expand Up @@ -489,6 +489,46 @@ func TestPause(t *testing.T) {
qt.Assert(t, qt.ErrorIs(err, ErrClosed), qt.Commentf("doesn't wrap ErrClosed"))
}

func TestPerfReaderWakeupEvents(t *testing.T) {
events := perfEventArray(t)

numEvents := 2
rd, err := NewReaderWithOptions(events, 4096, ReaderOptions{WakeupEvents: numEvents})
if err != nil {
t.Fatal(err)
}
defer rd.Close()

// Write a sample. The reader should read it.
prog := outputSamplesProg(t, events, 5)
ret, _, err := prog.Test(internal.EmptyBPFContext)
testutils.SkipIfNotSupported(t, err)
if err != nil || ret != 0 {
t.Fatal("Can't write sample")
}

if errno := syscall.Errno(-int32(ret)); errno != 0 {
t.Fatal("Expected 0 as return value, got", errno)
}

rd.SetDeadline(time.Now().Add(10 * time.Millisecond))
_, err = rd.Read()
qt.Assert(t, qt.ErrorIs(err, os.ErrDeadlineExceeded), qt.Commentf("expected os.ErrDeadlineExceeded"))

// send followup events
for i := 1; i < numEvents; i++ {
_, _, err = prog.Test(internal.EmptyBPFContext)
if err != nil {
t.Fatal(err)
}
}

rd.SetDeadline(time.Time{})
for i := 0; i < numEvents; i++ {
checkRecord(t, rd)
}
}

func BenchmarkReader(b *testing.B) {
events := perfEventArray(b)
prog := outputSamplesProg(b, events, 80)
Expand Down
29 changes: 18 additions & 11 deletions perf/ring.go
Expand Up @@ -22,12 +22,12 @@ type perfEventRing struct {
ringReader
}

func newPerfEventRing(cpu, perCPUBuffer, watermark int, overwritable bool) (*perfEventRing, error) {
if watermark >= perCPUBuffer {
func newPerfEventRing(cpu, perCPUBuffer int, opts ReaderOptions) (*perfEventRing, error) {
if opts.Watermark >= perCPUBuffer {
return nil, errors.New("watermark must be smaller than perCPUBuffer")
}

fd, err := createPerfEvent(cpu, watermark, overwritable)
fd, err := createPerfEvent(cpu, opts)
if err != nil {
return nil, err
}
Expand All @@ -38,7 +38,7 @@ func newPerfEventRing(cpu, perCPUBuffer, watermark int, overwritable bool) (*per
}

protections := unix.PROT_READ
if !overwritable {
if !opts.Overwritable {
protections |= unix.PROT_WRITE
}

Expand All @@ -55,7 +55,7 @@ func newPerfEventRing(cpu, perCPUBuffer, watermark int, overwritable bool) (*per
meta := (*unix.PerfEventMmapPage)(unsafe.Pointer(&mmap[0]))

var reader ringReader
if overwritable {
if opts.Overwritable {
reader = newReverseReader(meta, mmap[meta.Data_offset:meta.Data_offset+meta.Data_size])
} else {
reader = newForwardReader(meta, mmap[meta.Data_offset:meta.Data_offset+meta.Data_size])
Expand Down Expand Up @@ -98,13 +98,20 @@ func (ring *perfEventRing) Close() {
ring.mmap = nil
}

func createPerfEvent(cpu, watermark int, overwritable bool) (int, error) {
if watermark == 0 {
watermark = 1
func createPerfEvent(cpu int, opts ReaderOptions) (int, error) {
wakeup := 0
bits := 0
if opts.WakeupEvents > 0 {
wakeup = opts.WakeupEvents
} else {
wakeup = opts.Watermark
if wakeup == 0 {
wakeup = 1
}
bits |= unix.PerfBitWatermark
}

bits := unix.PerfBitWatermark
if overwritable {
if opts.Overwritable {
bits |= unix.PerfBitWriteBackward
}

Expand All @@ -113,7 +120,7 @@ func createPerfEvent(cpu, watermark int, overwritable bool) (int, error) {
Config: unix.PERF_COUNT_SW_BPF_OUTPUT,
Bits: uint64(bits),
Sample_type: unix.PERF_SAMPLE_RAW,
Wakeup: uint32(watermark),
Wakeup: uint32(wakeup),
}

attr.Size = uint32(unsafe.Sizeof(attr))
Expand Down
10 changes: 5 additions & 5 deletions perf/ring_test.go
Expand Up @@ -131,7 +131,7 @@ func makeForwardRing(size, offset int) *forwardReader {

func TestPerfEventRing(t *testing.T) {
check := func(buffer, watermark int, overwritable bool) {
ring, err := newPerfEventRing(0, buffer, watermark, overwritable)
ring, err := newPerfEventRing(0, buffer, ReaderOptions{Watermark: watermark, Overwritable: overwritable})
if err != nil {
t.Fatal(err)
}
Expand All @@ -154,21 +154,21 @@ func TestPerfEventRing(t *testing.T) {
}

// watermark > buffer
_, err := newPerfEventRing(0, 8192, 8193, false)
_, err := newPerfEventRing(0, 8192, ReaderOptions{Watermark: 8193, Overwritable: false})
if err == nil {
t.Fatal("watermark > buffer allowed")
}
_, err = newPerfEventRing(0, 8192, 8193, true)
_, err = newPerfEventRing(0, 8192, ReaderOptions{Watermark: 8193, Overwritable: true})
if err == nil {
t.Fatal("watermark > buffer allowed")
}

// watermark == buffer
_, err = newPerfEventRing(0, 8192, 8192, false)
_, err = newPerfEventRing(0, 8192, ReaderOptions{Watermark: 8192, Overwritable: false})
if err == nil {
t.Fatal("watermark == buffer allowed")
}
_, err = newPerfEventRing(0, 8192, 8192, true)
_, err = newPerfEventRing(0, 8192, ReaderOptions{Watermark: 8193, Overwritable: true})
if err == nil {
t.Fatal("watermark == buffer allowed")
}
Expand Down

0 comments on commit 9c1d099

Please sign in to comment.