Skip to content

Commit

Permalink
ringbuf: add ReadBuffer method
Browse files Browse the repository at this point in the history
Add a ReadBuffer method to ringbuf.Reader, analogous to perf.Reader.

    name                                   old time/op    new time/op    delta
    Reader/normal_epoll_with_timeout_-1-4    8.58µs ± 3%    9.07µs ± 4%     ~     (p=0.057 n=4+4)

    name                                   old alloc/op   new alloc/op   delta
    Reader/normal_epoll_with_timeout_-1-4      384B ± 0%      288B ± 0%  -25.00%  (p=0.029 n=4+4)

    name                                   old allocs/op  new allocs/op  delta
    Reader/normal_epoll_with_timeout_-1-4      4.00 ± 0%      1.00 ± 0%  -75.00%  (p=0.029 n=4+4)
  • Loading branch information
lmb committed May 6, 2022
1 parent 0846dad commit f398af5
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 10 deletions.
42 changes: 33 additions & 9 deletions ringbuf/reader.go
Expand Up @@ -20,6 +20,8 @@ var (
errBusy = errors.New("sample not committed yet")
)

var ringbufHeaderSize = binary.Size(ringbufHeader{})

// ringbufHeader from 'struct bpf_ringbuf_hdr' in kernel/bpf/ringbuf.c
type ringbufHeader struct {
Len uint32
Expand All @@ -42,16 +44,22 @@ type Record struct {
RawSample []byte
}

func readRecord(rd *ringbufEventRing) (r Record, err error) {
// Read a record from an event ring.
//
// buf must be at least ringbufHeaderSize bytes long. sampleBuf may be nil.
func readRecord(rd *ringbufEventRing, buf, sampleBuf []byte) (r Record, err error) {
rd.loadConsumer()
var header ringbufHeader
err = binary.Read(rd, internal.NativeEndian, &header)
if err == io.EOF {

buf = buf[:ringbufHeaderSize]
if _, err := io.ReadFull(rd, buf); err == io.EOF {
return Record{}, err
} else if err != nil {
return Record{}, fmt.Errorf("read event header: %w", err)
}

if err != nil {
return Record{}, fmt.Errorf("can't read event header: %w", err)
header := ringbufHeader{
internal.NativeEndian.Uint32(buf[0:4]),
internal.NativeEndian.Uint32(buf[4:8]),
}

if header.isBusy() {
Expand All @@ -76,10 +84,15 @@ func readRecord(rd *ringbufEventRing) (r Record, err error) {
return Record{}, fmt.Errorf("%w", errDiscard)
}

data := make([]byte, dataLenAligned)
var data []byte
if cap(sampleBuf) < int(dataLenAligned) {
data = make([]byte, dataLenAligned)
} else {
data = sampleBuf[:dataLenAligned]
}

if _, err := io.ReadFull(rd, data); err != nil {
return Record{}, fmt.Errorf("can't read sample: %w", err)
return Record{}, fmt.Errorf("read sample: %w", err)
}

rd.storeConsumer()
Expand All @@ -96,6 +109,7 @@ type Reader struct {
mu sync.Mutex
ring *ringbufEventRing
epollEvents []unix.EpollEvent
header []byte
}

// NewReader creates a new BPF ringbuf reader.
Expand Down Expand Up @@ -129,6 +143,7 @@ func NewReader(ringbufMap *ebpf.Map) (*Reader, error) {
poller: poller,
ring: ring,
epollEvents: make([]unix.EpollEvent, 1),
header: make([]byte, ringbufHeaderSize),
}, nil
}

Expand Down Expand Up @@ -159,6 +174,15 @@ func (r *Reader) Close() error {
//
// Calling Close interrupts the function.
func (r *Reader) Read() (Record, error) {
return r.ReadBuffer(nil)
}

// ReadBuffer is like Read except that it allows reusing buffers.
//
// buf is used as Record.RawSample if it is large enough to hold the sample data.
// If buf is too small a new buffer will be allocated instead. It is valid to
// pass nil, in which case ReadBuffer behaves like Read.
func (r *Reader) ReadBuffer(buf []byte) (Record, error) {
r.mu.Lock()
defer r.mu.Unlock()

Expand All @@ -172,7 +196,7 @@ func (r *Reader) Read() (Record, error) {
return Record{}, err
}

record, err := readRecord(r.ring)
record, err := readRecord(r.ring, r.header, buf)
if errors.Is(err, errBusy) || errors.Is(err, errDiscard) {
continue
}
Expand Down
6 changes: 5 additions & 1 deletion ringbuf/reader_test.go
Expand Up @@ -239,17 +239,21 @@ func BenchmarkReader(b *testing.B) {
b.ResetTimer()
b.ReportAllocs()

var sampleBuf []byte
for i := 0; i < b.N; i++ {
ret, _, err := prog.Test(buf)
if err != nil {
b.Fatal(err)
} else if errno := syscall.Errno(-int32(ret)); errno != 0 {
b.Fatal("Expected 0 as return value, got", errno)
}
_, err = rd.Read()

record, err := rd.ReadBuffer(sampleBuf)
if err != nil {
b.Fatal("Can't read samples:", err)
}

sampleBuf = record.RawSample
}
})
}
Expand Down

0 comments on commit f398af5

Please sign in to comment.