Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf, ringbuf: add ReadBuffer mehods #663

Merged
merged 3 commits into from May 10, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
97 changes: 68 additions & 29 deletions perf/reader.go
Expand Up @@ -20,6 +20,8 @@ var (
errEOR = errors.New("end of ring")
)

var perfEventHeaderSize = binary.Size(perfEventHeader{})

// perfEventHeader must match 'struct perf_event_header` in <linux/perf_event.h>.
type perfEventHeader struct {
Type uint32
Expand Down Expand Up @@ -47,34 +49,39 @@ type Record struct {
LostSamples uint64
}

// NB: Has to be preceded by a call to ring.loadHead.
func readRecordFromRing(ring *perfEventRing) (Record, error) {
defer ring.writeTail()
return readRecord(ring, ring.cpu)
}

func readRecord(rd io.Reader, cpu int) (Record, error) {
var header perfEventHeader
err := binary.Read(rd, internal.NativeEndian, &header)
if err == io.EOF {
return Record{}, errEOR
// Read a record from a reader and tag it as being from the given CPU.
//
// buf must be at least perfEventHeaderSize bytes long.
func readRecord(rd io.Reader, rec *Record, buf []byte) error {
// Assert that the buffer is large enough.
buf = buf[:perfEventHeaderSize]
_, err := io.ReadFull(rd, buf)
if errors.Is(err, io.EOF) {
return errEOR
} else if err != nil {
return fmt.Errorf("read perf event header: %v", err)
}

if err != nil {
return Record{}, fmt.Errorf("can't read event header: %v", err)
header := perfEventHeader{
internal.NativeEndian.Uint32(buf[0:4]),
internal.NativeEndian.Uint16(buf[4:6]),
internal.NativeEndian.Uint16(buf[6:8]),
}

switch header.Type {
case unix.PERF_RECORD_LOST:
lost, err := readLostRecords(rd)
return Record{CPU: cpu, LostSamples: lost}, err
rec.RawSample = rec.RawSample[:0]
rec.LostSamples, err = readLostRecords(rd)
return err

case unix.PERF_RECORD_SAMPLE:
sample, err := readRawSample(rd)
return Record{CPU: cpu, RawSample: sample}, err
rec.LostSamples = 0
// We can reuse buf here because perfEventHeaderSize > perfEventSampleSize.
rec.RawSample, err = readRawSample(rd, buf, rec.RawSample)
return err

default:
return Record{}, &unknownEventError{header.Type}
return &unknownEventError{header.Type}
}
}

Expand All @@ -93,16 +100,32 @@ func readLostRecords(rd io.Reader) (uint64, error) {
return lostHeader.Lost, nil
}

func readRawSample(rd io.Reader) ([]byte, error) {
// This must match 'struct perf_event_sample in kernel sources.
var size uint32
if err := binary.Read(rd, internal.NativeEndian, &size); err != nil {
return nil, fmt.Errorf("can't read sample size: %v", err)
var perfEventSampleSize = binary.Size(uint32(0))

// This must match 'struct perf_event_sample in kernel sources.
type perfEventSample struct {
Size uint32
}

func readRawSample(rd io.Reader, buf, sampleBuf []byte) ([]byte, error) {
buf = buf[:perfEventSampleSize]
if _, err := io.ReadFull(rd, buf); err != nil {
return nil, fmt.Errorf("read sample size: %v", err)
}

sample := perfEventSample{
internal.NativeEndian.Uint32(buf),
}

var data []byte
if size := int(sample.Size); cap(sampleBuf) < size {
data = make([]byte, size)
} else {
data = sampleBuf[:size]
}

data := make([]byte, int(size))
if _, err := io.ReadFull(rd, data); err != nil {
return nil, fmt.Errorf("can't read sample: %v", err)
return nil, fmt.Errorf("read sample: %v", err)
}
return data, nil
}
Expand All @@ -123,6 +146,7 @@ type Reader struct {
rings []*perfEventRing
epollEvents []unix.EpollEvent
epollRings []*perfEventRing
eventHeader []byte

// pauseFds are a copy of the fds in 'rings', protected by 'pauseMu'.
// These allow Pause/Resume to be executed independently of any ongoing
Expand Down Expand Up @@ -215,6 +239,7 @@ func NewReaderWithOptions(array *ebpf.Map, perCPUBuffer int, opts ReaderOptions)
poller: poller,
epollEvents: make([]unix.EpollEvent, len(rings)),
epollRings: make([]*perfEventRing, 0, len(rings)),
eventHeader: make([]byte, perfEventHeaderSize),
pauseFds: pauseFds,
}
if err = pr.Resume(); err != nil {
Expand Down Expand Up @@ -266,18 +291,24 @@ func (pr *Reader) Close() error {
//
// Calling Close interrupts the function.
func (pr *Reader) Read() (Record, error) {
var r Record
return r, pr.ReadInto(&r)
}

// ReadInto is like Read except that it allows reusing Record and associated buffers.
func (pr *Reader) ReadInto(rec *Record) error {
pr.mu.Lock()
defer pr.mu.Unlock()

if pr.rings == nil {
return Record{}, fmt.Errorf("perf ringbuffer: %w", ErrClosed)
return fmt.Errorf("perf ringbuffer: %w", ErrClosed)
}

for {
if len(pr.epollRings) == 0 {
nEvents, err := pr.poller.Wait(pr.epollEvents)
if err != nil {
return Record{}, err
return err
}

for _, event := range pr.epollEvents[:nEvents] {
Expand All @@ -294,15 +325,15 @@ func (pr *Reader) Read() (Record, error) {
// Start at the last available event. The order in which we
// process them doesn't matter, and starting at the back allows
// resizing epollRings to keep track of processed rings.
record, err := readRecordFromRing(pr.epollRings[len(pr.epollRings)-1])
err := pr.readRecordFromRing(rec, pr.epollRings[len(pr.epollRings)-1])
if err == errEOR {
// We've emptied the current ring buffer, process
// the next one.
pr.epollRings = pr.epollRings[:len(pr.epollRings)-1]
continue
}

return record, err
return err
}
}

Expand Down Expand Up @@ -353,6 +384,14 @@ func (pr *Reader) Resume() error {
return nil
}

// NB: Has to be preceded by a call to ring.loadHead.
func (pr *Reader) readRecordFromRing(rec *Record, ring *perfEventRing) error {
defer ring.writeTail()

rec.CPU = ring.cpu
return readRecord(ring, rec, pr.eventHeader)
}

type unknownEventError struct {
eventType uint32
}
Expand Down
66 changes: 65 additions & 1 deletion perf/reader_test.go
Expand Up @@ -294,7 +294,8 @@ func TestReadRecord(t *testing.T) {
t.Fatal(err)
}

_, err = readRecord(&buf, 0)
var rec Record
err = readRecord(&buf, &rec, make([]byte, perfEventHeaderSize))
if !IsUnknownEvent(err) {
t.Error("readRecord should return unknown event error, got", err)
}
Expand Down Expand Up @@ -415,6 +416,39 @@ func BenchmarkReader(b *testing.B) {
}
}

func BenchmarkReadInto(b *testing.B) {
prog, events := mustOutputSamplesProg(b, 80)
defer prog.Close()
defer events.Close()

rd, err := NewReader(events, 4096)
if err != nil {
b.Fatal(err)
}
defer rd.Close()

buf := make([]byte, 14)

b.ResetTimer()
b.ReportAllocs()

var rec Record
for i := 0; i < b.N; i++ {
// NB: Submitting samples into the perf event ring dominates
// the benchmark time unfortunately.
ret, _, err := prog.Test(buf)
if err != nil {
b.Fatal(err)
} else if errno := syscall.Errno(-int32(ret)); errno != 0 {
b.Fatal("Expected 0 as return value, got", errno)
}

if err := rd.ReadInto(&rec); err != nil {
b.Fatal(err)
}
}
}

// This exists just to make the example below nicer.
func bpfPerfEventOutputProgram() (*ebpf.Program, *ebpf.Map) {
prog, events, err := outputSamplesProg(5)
Expand Down Expand Up @@ -468,3 +502,33 @@ func ExampleReader() {
// Data is padded with 0 for alignment
fmt.Println("Sample:", record.RawSample)
}

// ReadRecord allows reducing memory allocations.
func ExampleReader_ReadInto() {
prog, events := bpfPerfEventOutputProgram()
defer prog.Close()
defer events.Close()

rd, err := NewReader(events, 4096)
if err != nil {
panic(err)
}
defer rd.Close()

for i := 0; i < 2; i++ {
// Write out two samples
ret, _, err := prog.Test(make([]byte, 14))
if err != nil || ret != 0 {
panic("Can't write sample")
}
}

var rec Record
for i := 0; i < 2; i++ {
if err := rd.ReadInto(&rec); err != nil {
panic(err)
}

fmt.Println("Sample:", rec.RawSample[:5])
}
}
56 changes: 38 additions & 18 deletions ringbuf/reader.go
Expand Up @@ -20,6 +20,8 @@ var (
errBusy = errors.New("sample not committed yet")
)

var ringbufHeaderSize = binary.Size(ringbufHeader{})

// ringbufHeader from 'struct bpf_ringbuf_hdr' in kernel/bpf/ringbuf.c
type ringbufHeader struct {
Len uint32
Expand All @@ -42,23 +44,29 @@ type Record struct {
RawSample []byte
}

func readRecord(rd *ringbufEventRing) (r Record, err error) {
// Read a record from an event ring.
//
// buf must be at least ringbufHeaderSize bytes long.
func readRecord(rd *ringbufEventRing, rec *Record, buf []byte) error {
rd.loadConsumer()
var header ringbufHeader
err = binary.Read(rd, internal.NativeEndian, &header)
if err == io.EOF {
return Record{}, err

buf = buf[:ringbufHeaderSize]
if _, err := io.ReadFull(rd, buf); err == io.EOF {
return err
} else if err != nil {
return fmt.Errorf("read event header: %w", err)
}

if err != nil {
return Record{}, fmt.Errorf("can't read event header: %w", err)
header := ringbufHeader{
internal.NativeEndian.Uint32(buf[0:4]),
internal.NativeEndian.Uint32(buf[4:8]),
}

if header.isBusy() {
// the next sample in the ring is not committed yet so we
// exit without storing the reader/consumer position
// and start again from the same position.
return Record{}, fmt.Errorf("%w", errBusy)
return fmt.Errorf("%w", errBusy)
}

/* read up to 8 byte alignment */
Expand All @@ -73,18 +81,22 @@ func readRecord(rd *ringbufEventRing) (r Record, err error) {
rd.skipRead(dataLenAligned)
rd.storeConsumer()

return Record{}, fmt.Errorf("%w", errDiscard)
return fmt.Errorf("%w", errDiscard)
}

data := make([]byte, dataLenAligned)
if cap(rec.RawSample) < int(dataLenAligned) {
rec.RawSample = make([]byte, dataLenAligned)
} else {
rec.RawSample = rec.RawSample[:dataLenAligned]
}

if _, err := io.ReadFull(rd, data); err != nil {
return Record{}, fmt.Errorf("can't read sample: %w", err)
if _, err := io.ReadFull(rd, rec.RawSample); err != nil {
return fmt.Errorf("read sample: %w", err)
}

rd.storeConsumer()

return Record{RawSample: data[:header.dataLen()]}, nil
rec.RawSample = rec.RawSample[:header.dataLen()]
return nil
}

// Reader allows reading bpf_ringbuf_output
Expand All @@ -96,6 +108,7 @@ type Reader struct {
mu sync.Mutex
ring *ringbufEventRing
epollEvents []unix.EpollEvent
header []byte
}

// NewReader creates a new BPF ringbuf reader.
Expand Down Expand Up @@ -129,6 +142,7 @@ func NewReader(ringbufMap *ebpf.Map) (*Reader, error) {
poller: poller,
ring: ring,
epollEvents: make([]unix.EpollEvent, 1),
header: make([]byte, ringbufHeaderSize),
}, nil
}

Expand Down Expand Up @@ -159,24 +173,30 @@ func (r *Reader) Close() error {
//
// Calling Close interrupts the function.
func (r *Reader) Read() (Record, error) {
var rec Record
return rec, r.ReadInto(&rec)
}

// ReadInto is like Read except that it allows reusing Record and associated buffers.
func (r *Reader) ReadInto(rec *Record) error {
r.mu.Lock()
defer r.mu.Unlock()

if r.ring == nil {
return Record{}, fmt.Errorf("ringbuffer: %w", ErrClosed)
return fmt.Errorf("ringbuffer: %w", ErrClosed)
}

for {
_, err := r.poller.Wait(r.epollEvents)
if err != nil {
return Record{}, err
return err
}

record, err := readRecord(r.ring)
err = readRecord(r.ring, rec, r.header)
if errors.Is(err, errBusy) || errors.Is(err, errDiscard) {
continue
}

return record, err
return err
}
}