Skip to content

Commit

Permalink
perf: add Reader.ReadInto
Browse files Browse the repository at this point in the history
Allow reading from a Reader without allocations, once a steady state
has been reached.

    name          time/op
    Reader-4      8.84µs ± 3%
    ReadInto-4    8.83µs ± 4%

    name          alloc/op
    Reader-4        384B ± 0%
    ReadInto-4      288B ± 0%

    name          allocs/op
    Reader-4        2.00 ± 0%
    ReadInto-4      1.00 ± 0%
  • Loading branch information
lmb committed May 10, 2022
1 parent e082579 commit 454f0e7
Show file tree
Hide file tree
Showing 2 changed files with 133 additions and 30 deletions.
97 changes: 68 additions & 29 deletions perf/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ var (
errEOR = errors.New("end of ring")
)

var perfEventHeaderSize = binary.Size(perfEventHeader{})

// perfEventHeader must match 'struct perf_event_header` in <linux/perf_event.h>.
type perfEventHeader struct {
Type uint32
Expand Down Expand Up @@ -47,34 +49,39 @@ type Record struct {
LostSamples uint64
}

// NB: Has to be preceded by a call to ring.loadHead.
func readRecordFromRing(ring *perfEventRing) (Record, error) {
defer ring.writeTail()
return readRecord(ring, ring.cpu)
}

func readRecord(rd io.Reader, cpu int) (Record, error) {
var header perfEventHeader
err := binary.Read(rd, internal.NativeEndian, &header)
if err == io.EOF {
return Record{}, errEOR
// Read a record from a reader and tag it as being from the given CPU.
//
// buf must be at least perfEventHeaderSize bytes long.
func readRecord(rd io.Reader, rec *Record, buf []byte) error {
// Assert that the buffer is large enough.
buf = buf[:perfEventHeaderSize]
_, err := io.ReadFull(rd, buf)
if errors.Is(err, io.EOF) {
return errEOR
} else if err != nil {
return fmt.Errorf("read perf event header: %v", err)
}

if err != nil {
return Record{}, fmt.Errorf("can't read event header: %v", err)
header := perfEventHeader{
internal.NativeEndian.Uint32(buf[0:4]),
internal.NativeEndian.Uint16(buf[4:6]),
internal.NativeEndian.Uint16(buf[6:8]),
}

switch header.Type {
case unix.PERF_RECORD_LOST:
lost, err := readLostRecords(rd)
return Record{CPU: cpu, LostSamples: lost}, err
rec.RawSample = rec.RawSample[:0]
rec.LostSamples, err = readLostRecords(rd)
return err

case unix.PERF_RECORD_SAMPLE:
sample, err := readRawSample(rd)
return Record{CPU: cpu, RawSample: sample}, err
rec.LostSamples = 0
// We can reuse buf here because perfEventHeaderSize > perfEventSampleSize.
rec.RawSample, err = readRawSample(rd, buf, rec.RawSample)
return err

default:
return Record{}, &unknownEventError{header.Type}
return &unknownEventError{header.Type}
}
}

Expand All @@ -93,16 +100,32 @@ func readLostRecords(rd io.Reader) (uint64, error) {
return lostHeader.Lost, nil
}

func readRawSample(rd io.Reader) ([]byte, error) {
// This must match 'struct perf_event_sample in kernel sources.
var size uint32
if err := binary.Read(rd, internal.NativeEndian, &size); err != nil {
return nil, fmt.Errorf("can't read sample size: %v", err)
var perfEventSampleSize = binary.Size(uint32(0))

// This must match 'struct perf_event_sample in kernel sources.
type perfEventSample struct {
Size uint32
}

func readRawSample(rd io.Reader, buf, sampleBuf []byte) ([]byte, error) {
buf = buf[:perfEventSampleSize]
if _, err := io.ReadFull(rd, buf); err != nil {
return nil, fmt.Errorf("read sample size: %v", err)
}

sample := perfEventSample{
internal.NativeEndian.Uint32(buf),
}

var data []byte
if size := int(sample.Size); cap(sampleBuf) < size {
data = make([]byte, size)
} else {
data = sampleBuf[:size]
}

data := make([]byte, int(size))
if _, err := io.ReadFull(rd, data); err != nil {
return nil, fmt.Errorf("can't read sample: %v", err)
return nil, fmt.Errorf("read sample: %v", err)
}
return data, nil
}
Expand All @@ -123,6 +146,7 @@ type Reader struct {
rings []*perfEventRing
epollEvents []unix.EpollEvent
epollRings []*perfEventRing
eventHeader []byte

// pauseFds are a copy of the fds in 'rings', protected by 'pauseMu'.
// These allow Pause/Resume to be executed independently of any ongoing
Expand Down Expand Up @@ -215,6 +239,7 @@ func NewReaderWithOptions(array *ebpf.Map, perCPUBuffer int, opts ReaderOptions)
poller: poller,
epollEvents: make([]unix.EpollEvent, len(rings)),
epollRings: make([]*perfEventRing, 0, len(rings)),
eventHeader: make([]byte, perfEventHeaderSize),
pauseFds: pauseFds,
}
if err = pr.Resume(); err != nil {
Expand Down Expand Up @@ -266,18 +291,24 @@ func (pr *Reader) Close() error {
//
// Calling Close interrupts the function.
func (pr *Reader) Read() (Record, error) {
var r Record
return r, pr.ReadInto(&r)
}

// ReadInto is like Read except that it allows reusing Record and associated buffers.
func (pr *Reader) ReadInto(rec *Record) error {
pr.mu.Lock()
defer pr.mu.Unlock()

if pr.rings == nil {
return Record{}, fmt.Errorf("perf ringbuffer: %w", ErrClosed)
return fmt.Errorf("perf ringbuffer: %w", ErrClosed)
}

for {
if len(pr.epollRings) == 0 {
nEvents, err := pr.poller.Wait(pr.epollEvents)
if err != nil {
return Record{}, err
return err
}

for _, event := range pr.epollEvents[:nEvents] {
Expand All @@ -294,15 +325,15 @@ func (pr *Reader) Read() (Record, error) {
// Start at the last available event. The order in which we
// process them doesn't matter, and starting at the back allows
// resizing epollRings to keep track of processed rings.
record, err := readRecordFromRing(pr.epollRings[len(pr.epollRings)-1])
err := pr.readRecordFromRing(rec, pr.epollRings[len(pr.epollRings)-1])
if err == errEOR {
// We've emptied the current ring buffer, process
// the next one.
pr.epollRings = pr.epollRings[:len(pr.epollRings)-1]
continue
}

return record, err
return err
}
}

Expand Down Expand Up @@ -353,6 +384,14 @@ func (pr *Reader) Resume() error {
return nil
}

// NB: Has to be preceded by a call to ring.loadHead.
func (pr *Reader) readRecordFromRing(rec *Record, ring *perfEventRing) error {
defer ring.writeTail()

rec.CPU = ring.cpu
return readRecord(ring, rec, pr.eventHeader)
}

type unknownEventError struct {
eventType uint32
}
Expand Down
66 changes: 65 additions & 1 deletion perf/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,8 @@ func TestReadRecord(t *testing.T) {
t.Fatal(err)
}

_, err = readRecord(&buf, 0)
var rec Record
err = readRecord(&buf, &rec, make([]byte, perfEventHeaderSize))
if !IsUnknownEvent(err) {
t.Error("readRecord should return unknown event error, got", err)
}
Expand Down Expand Up @@ -415,6 +416,39 @@ func BenchmarkReader(b *testing.B) {
}
}

func BenchmarkReadInto(b *testing.B) {
prog, events := mustOutputSamplesProg(b, 80)
defer prog.Close()
defer events.Close()

rd, err := NewReader(events, 4096)
if err != nil {
b.Fatal(err)
}
defer rd.Close()

buf := make([]byte, 14)

b.ResetTimer()
b.ReportAllocs()

var rec Record
for i := 0; i < b.N; i++ {
// NB: Submitting samples into the perf event ring dominates
// the benchmark time unfortunately.
ret, _, err := prog.Test(buf)
if err != nil {
b.Fatal(err)
} else if errno := syscall.Errno(-int32(ret)); errno != 0 {
b.Fatal("Expected 0 as return value, got", errno)
}

if err := rd.ReadInto(&rec); err != nil {
b.Fatal(err)
}
}
}

// This exists just to make the example below nicer.
func bpfPerfEventOutputProgram() (*ebpf.Program, *ebpf.Map) {
prog, events, err := outputSamplesProg(5)
Expand Down Expand Up @@ -468,3 +502,33 @@ func ExampleReader() {
// Data is padded with 0 for alignment
fmt.Println("Sample:", record.RawSample)
}

// ReadRecord allows reducing memory allocations.
func ExampleReader_ReadInto() {
prog, events := bpfPerfEventOutputProgram()
defer prog.Close()
defer events.Close()

rd, err := NewReader(events, 4096)
if err != nil {
panic(err)
}
defer rd.Close()

for i := 0; i < 2; i++ {
// Write out two samples
ret, _, err := prog.Test(make([]byte, 14))
if err != nil || ret != 0 {
panic("Can't write sample")
}
}

var rec Record
for i := 0; i < 2; i++ {
if err := rd.ReadInto(&rec); err != nil {
panic(err)
}

fmt.Println("Sample:", rec.RawSample[:5])
}
}

0 comments on commit 454f0e7

Please sign in to comment.