diff --git a/ringbuf/reader.go b/ringbuf/reader.go index 64e5fec6f..3b1f26ef7 100644 --- a/ringbuf/reader.go +++ b/ringbuf/reader.go @@ -20,6 +20,8 @@ var ( errBusy = errors.New("sample not committed yet") ) +var ringbufHeaderSize = binary.Size(ringbufHeader{}) + // ringbufHeader from 'struct bpf_ringbuf_hdr' in kernel/bpf/ringbuf.c type ringbufHeader struct { Len uint32 @@ -42,23 +44,29 @@ type Record struct { RawSample []byte } -func readRecord(rd *ringbufEventRing) (r Record, err error) { +// Read a record from an event ring. +// +// buf must be at least ringbufHeaderSize bytes long. +func readRecord(rd *ringbufEventRing, rec *Record, buf []byte) error { rd.loadConsumer() - var header ringbufHeader - err = binary.Read(rd, internal.NativeEndian, &header) - if err == io.EOF { - return Record{}, err + + buf = buf[:ringbufHeaderSize] + if _, err := io.ReadFull(rd, buf); err == io.EOF { + return err + } else if err != nil { + return fmt.Errorf("read event header: %w", err) } - if err != nil { - return Record{}, fmt.Errorf("can't read event header: %w", err) + header := ringbufHeader{ + internal.NativeEndian.Uint32(buf[0:4]), + internal.NativeEndian.Uint32(buf[4:8]), } if header.isBusy() { // the next sample in the ring is not committed yet so we // exit without storing the reader/consumer position // and start again from the same position. - return Record{}, fmt.Errorf("%w", errBusy) + return fmt.Errorf("%w", errBusy) } /* read up to 8 byte alignment */ @@ -73,18 +81,22 @@ func readRecord(rd *ringbufEventRing) (r Record, err error) { rd.skipRead(dataLenAligned) rd.storeConsumer() - return Record{}, fmt.Errorf("%w", errDiscard) + return fmt.Errorf("%w", errDiscard) } - data := make([]byte, dataLenAligned) + if cap(rec.RawSample) < int(dataLenAligned) { + rec.RawSample = make([]byte, dataLenAligned) + } else { + rec.RawSample = rec.RawSample[:dataLenAligned] + } - if _, err := io.ReadFull(rd, data); err != nil { - return Record{}, fmt.Errorf("can't read sample: %w", err) + if _, err := io.ReadFull(rd, rec.RawSample); err != nil { + return fmt.Errorf("read sample: %w", err) } rd.storeConsumer() - - return Record{RawSample: data[:header.dataLen()]}, nil + rec.RawSample = rec.RawSample[:header.dataLen()] + return nil } // Reader allows reading bpf_ringbuf_output @@ -96,6 +108,7 @@ type Reader struct { mu sync.Mutex ring *ringbufEventRing epollEvents []unix.EpollEvent + header []byte } // NewReader creates a new BPF ringbuf reader. @@ -129,6 +142,7 @@ func NewReader(ringbufMap *ebpf.Map) (*Reader, error) { poller: poller, ring: ring, epollEvents: make([]unix.EpollEvent, 1), + header: make([]byte, ringbufHeaderSize), }, nil } @@ -159,24 +173,30 @@ func (r *Reader) Close() error { // // Calling Close interrupts the function. func (r *Reader) Read() (Record, error) { + var rec Record + return rec, r.ReadInto(&rec) +} + +// ReadInto is like Read except that it allows reusing Record and associated buffers. +func (r *Reader) ReadInto(rec *Record) error { r.mu.Lock() defer r.mu.Unlock() if r.ring == nil { - return Record{}, fmt.Errorf("ringbuffer: %w", ErrClosed) + return fmt.Errorf("ringbuffer: %w", ErrClosed) } for { _, err := r.poller.Wait(r.epollEvents) if err != nil { - return Record{}, err + return err } - record, err := readRecord(r.ring) + err = readRecord(r.ring, rec, r.header) if errors.Is(err, errBusy) || errors.Is(err, errDiscard) { continue } - return record, err + return err } } diff --git a/ringbuf/reader_test.go b/ringbuf/reader_test.go index 34ef6ec8f..71e1ea04b 100644 --- a/ringbuf/reader_test.go +++ b/ringbuf/reader_test.go @@ -254,3 +254,34 @@ func BenchmarkReader(b *testing.B) { }) } } + +func BenchmarkReadInto(b *testing.B) { + testutils.SkipOnOldKernel(b, "5.8", "BPF ring buffer") + + prog, events := mustOutputSamplesProg(b, 0, 80) + + rd, err := NewReader(events) + if err != nil { + b.Fatal(err) + } + defer rd.Close() + + buf := make([]byte, 14) + + b.ResetTimer() + b.ReportAllocs() + + var rec Record + for i := 0; i < b.N; i++ { + ret, _, err := prog.Test(buf) + if err != nil { + b.Fatal(err) + } else if errno := syscall.Errno(-int32(ret)); errno != 0 { + b.Fatal("Expected 0 as return value, got", errno) + } + + if err := rd.ReadInto(&rec); err != nil { + b.Fatal("Can't read samples:", err) + } + } +}