diff --git a/ringbuf/reader.go b/ringbuf/reader.go index 64e5fec6f..e59c2cc1e 100644 --- a/ringbuf/reader.go +++ b/ringbuf/reader.go @@ -20,6 +20,8 @@ var ( errBusy = errors.New("sample not committed yet") ) +var ringbufHeaderSize = binary.Size(ringbufHeader{}) + // ringbufHeader from 'struct bpf_ringbuf_hdr' in kernel/bpf/ringbuf.c type ringbufHeader struct { Len uint32 @@ -42,16 +44,22 @@ type Record struct { RawSample []byte } -func readRecord(rd *ringbufEventRing) (r Record, err error) { +// Read a record from an event ring. +// +// buf must be at least ringbufHeaderSize bytes long. sampleBuf may be nil. +func readRecord(rd *ringbufEventRing, buf, sampleBuf []byte) (r Record, err error) { rd.loadConsumer() - var header ringbufHeader - err = binary.Read(rd, internal.NativeEndian, &header) - if err == io.EOF { + + buf = buf[:ringbufHeaderSize] + if _, err := io.ReadFull(rd, buf); err == io.EOF { return Record{}, err + } else if err != nil { + return Record{}, fmt.Errorf("read event header: %w", err) } - if err != nil { - return Record{}, fmt.Errorf("can't read event header: %w", err) + header := ringbufHeader{ + internal.NativeEndian.Uint32(buf[0:4]), + internal.NativeEndian.Uint32(buf[4:8]), } if header.isBusy() { @@ -76,10 +84,15 @@ func readRecord(rd *ringbufEventRing) (r Record, err error) { return Record{}, fmt.Errorf("%w", errDiscard) } - data := make([]byte, dataLenAligned) + var data []byte + if cap(sampleBuf) < int(dataLenAligned) { + data = make([]byte, dataLenAligned) + } else { + data = sampleBuf[:dataLenAligned] + } if _, err := io.ReadFull(rd, data); err != nil { - return Record{}, fmt.Errorf("can't read sample: %w", err) + return Record{}, fmt.Errorf("read sample: %w", err) } rd.storeConsumer() @@ -96,6 +109,7 @@ type Reader struct { mu sync.Mutex ring *ringbufEventRing epollEvents []unix.EpollEvent + header []byte } // NewReader creates a new BPF ringbuf reader. @@ -129,6 +143,7 @@ func NewReader(ringbufMap *ebpf.Map) (*Reader, error) { poller: poller, ring: ring, epollEvents: make([]unix.EpollEvent, 1), + header: make([]byte, ringbufHeaderSize), }, nil } @@ -159,6 +174,15 @@ func (r *Reader) Close() error { // // Calling Close interrupts the function. func (r *Reader) Read() (Record, error) { + return r.ReadBuffer(nil) +} + +// ReadBuffer is like Read except that it allows reusing buffers. +// +// buf is used as Record.RawSample if it is large enough to hold the sample data. +// If buf is too small a new buffer will be allocated instead. It is valid to +// pass nil, in which case ReadBuffer behaves like Read. +func (r *Reader) ReadBuffer(buf []byte) (Record, error) { r.mu.Lock() defer r.mu.Unlock() @@ -172,7 +196,7 @@ func (r *Reader) Read() (Record, error) { return Record{}, err } - record, err := readRecord(r.ring) + record, err := readRecord(r.ring, r.header, buf) if errors.Is(err, errBusy) || errors.Is(err, errDiscard) { continue } diff --git a/ringbuf/reader_test.go b/ringbuf/reader_test.go index 34ef6ec8f..03f41a680 100644 --- a/ringbuf/reader_test.go +++ b/ringbuf/reader_test.go @@ -239,6 +239,7 @@ func BenchmarkReader(b *testing.B) { b.ResetTimer() b.ReportAllocs() + var sampleBuf []byte for i := 0; i < b.N; i++ { ret, _, err := prog.Test(buf) if err != nil { @@ -246,10 +247,13 @@ func BenchmarkReader(b *testing.B) { } else if errno := syscall.Errno(-int32(ret)); errno != 0 { b.Fatal("Expected 0 as return value, got", errno) } - _, err = rd.Read() + + record, err := rd.ReadBuffer(sampleBuf) if err != nil { b.Fatal("Can't read samples:", err) } + + sampleBuf = record.RawSample } }) }