Skip to content

Commit

Permalink
ringbuf: add ReadRecord method
Browse files Browse the repository at this point in the history
Add a ReadRecord method to ringbuf.Reader, analogous to perf.Reader.

    name                                   time/op
    Reader/normal_epoll_with_timeout_-1-4  8.87µs ± 1%
    ReadRecord-4                           8.90µs ± 3%

    name                                   alloc/op
    Reader/normal_epoll_with_timeout_-1-4    368B ± 0%
    ReadRecord-4                             288B ± 0%

    name                                   allocs/op
    Reader/normal_epoll_with_timeout_-1-4    2.00 ± 0%
    ReadRecord-4                             1.00 ± 0%
  • Loading branch information
lmb committed May 10, 2022
1 parent 867355a commit 21b84fd
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 18 deletions.
60 changes: 42 additions & 18 deletions ringbuf/reader.go
Expand Up @@ -20,6 +20,8 @@ var (
errBusy = errors.New("sample not committed yet")
)

var ringbufHeaderSize = binary.Size(ringbufHeader{})

// ringbufHeader from 'struct bpf_ringbuf_hdr' in kernel/bpf/ringbuf.c
type ringbufHeader struct {
Len uint32
Expand All @@ -42,23 +44,29 @@ type Record struct {
RawSample []byte
}

func readRecord(rd *ringbufEventRing) (r Record, err error) {
// Read a record from an event ring.
//
// buf must be at least ringbufHeaderSize bytes long.
func readRecord(rd *ringbufEventRing, rec *Record, buf []byte) error {
rd.loadConsumer()
var header ringbufHeader
err = binary.Read(rd, internal.NativeEndian, &header)
if err == io.EOF {
return Record{}, err

buf = buf[:ringbufHeaderSize]
if _, err := io.ReadFull(rd, buf); err == io.EOF {
return err
} else if err != nil {
return fmt.Errorf("read event header: %w", err)
}

if err != nil {
return Record{}, fmt.Errorf("can't read event header: %w", err)
header := ringbufHeader{
internal.NativeEndian.Uint32(buf[0:4]),
internal.NativeEndian.Uint32(buf[4:8]),
}

if header.isBusy() {
// the next sample in the ring is not committed yet so we
// exit without storing the reader/consumer position
// and start again from the same position.
return Record{}, fmt.Errorf("%w", errBusy)
return fmt.Errorf("%w", errBusy)
}

/* read up to 8 byte alignment */
Expand All @@ -73,18 +81,22 @@ func readRecord(rd *ringbufEventRing) (r Record, err error) {
rd.skipRead(dataLenAligned)
rd.storeConsumer()

return Record{}, fmt.Errorf("%w", errDiscard)
return fmt.Errorf("%w", errDiscard)
}

data := make([]byte, dataLenAligned)
if cap(rec.RawSample) < int(dataLenAligned) {
rec.RawSample = make([]byte, dataLenAligned)
} else {
rec.RawSample = rec.RawSample[:dataLenAligned]
}

if _, err := io.ReadFull(rd, data); err != nil {
return Record{}, fmt.Errorf("can't read sample: %w", err)
if _, err := io.ReadFull(rd, rec.RawSample); err != nil {
return fmt.Errorf("read sample: %w", err)
}

rd.storeConsumer()

return Record{RawSample: data[:header.dataLen()]}, nil
rec.RawSample = rec.RawSample[:header.dataLen()]
return nil
}

// Reader allows reading bpf_ringbuf_output
Expand All @@ -96,6 +108,7 @@ type Reader struct {
mu sync.Mutex
ring *ringbufEventRing
epollEvents []unix.EpollEvent
header []byte
}

// NewReader creates a new BPF ringbuf reader.
Expand Down Expand Up @@ -129,6 +142,7 @@ func NewReader(ringbufMap *ebpf.Map) (*Reader, error) {
poller: poller,
ring: ring,
epollEvents: make([]unix.EpollEvent, 1),
header: make([]byte, ringbufHeaderSize),
}, nil
}

Expand Down Expand Up @@ -159,24 +173,34 @@ func (r *Reader) Close() error {
//
// Calling Close interrupts the function.
func (r *Reader) Read() (Record, error) {
var rec Record
return rec, r.ReadRecord(&rec)
}

// ReadRecord is like Read except that it allows reusing buffers.
//
// buf is used as Record.RawSample if it is large enough to hold the sample data.
// If buf is too small a new buffer will be allocated instead. It is valid to
// pass nil, in which case ReadRecord behaves like Read.
func (r *Reader) ReadRecord(rec *Record) error {
r.mu.Lock()
defer r.mu.Unlock()

if r.ring == nil {
return Record{}, fmt.Errorf("ringbuffer: %w", ErrClosed)
return fmt.Errorf("ringbuffer: %w", ErrClosed)
}

for {
_, err := r.poller.Wait(r.epollEvents)
if err != nil {
return Record{}, err
return err
}

record, err := readRecord(r.ring)
err = readRecord(r.ring, rec, r.header)
if errors.Is(err, errBusy) || errors.Is(err, errDiscard) {
continue
}

return record, err
return err
}
}
31 changes: 31 additions & 0 deletions ringbuf/reader_test.go
Expand Up @@ -254,3 +254,34 @@ func BenchmarkReader(b *testing.B) {
})
}
}

func BenchmarkReadRecord(b *testing.B) {
testutils.SkipOnOldKernel(b, "5.8", "BPF ring buffer")

prog, events := mustOutputSamplesProg(b, 0, 80)

rd, err := NewReader(events)
if err != nil {
b.Fatal(err)
}
defer rd.Close()

buf := make([]byte, 14)

b.ResetTimer()
b.ReportAllocs()

var rec Record
for i := 0; i < b.N; i++ {
ret, _, err := prog.Test(buf)
if err != nil {
b.Fatal(err)
} else if errno := syscall.Errno(-int32(ret)); errno != 0 {
b.Fatal("Expected 0 as return value, got", errno)
}

if err := rd.ReadRecord(&rec); err != nil {
b.Fatal("Can't read samples:", err)
}
}
}

0 comments on commit 21b84fd

Please sign in to comment.