Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Split logic #367

Merged
merged 11 commits into from
Oct 26, 2023
16 changes: 16 additions & 0 deletions buf-common.go
@@ -0,0 +1,16 @@
package libbpfgo

/*
#cgo LDFLAGS: -lelf -lz
#include "libbpfgo.h"
*/
import "C"

const (
// Maximum number of channels (RingBuffers + PerfBuffers) supported
maxEventChannels = 512
)

var (
eventChannels = newRWArray(maxEventChannels)
)
112 changes: 112 additions & 0 deletions buf-perf.go
@@ -0,0 +1,112 @@
package libbpfgo

/*
#cgo LDFLAGS: -lelf -lz
#include "libbpfgo.h"
*/
import "C"

import (
"fmt"
"sync"
"syscall"
)

//
// PerfBuffer
//

type PerfBuffer struct {
pb *C.struct_perf_buffer
bpfMap *BPFMap
slot uint
eventsChan chan []byte
lostChan chan uint64
stop chan struct{}
closed bool
wg sync.WaitGroup
}

// Poll will wait until timeout in milliseconds to gather
// data from the perf buffer.
func (pb *PerfBuffer) Poll(timeout int) {
pb.stop = make(chan struct{})
pb.wg.Add(1)
go pb.poll(timeout)
}

// Deprecated: use PerfBuffer.Poll() instead.
func (pb *PerfBuffer) Start() {
pb.Poll(300)
}

func (pb *PerfBuffer) Stop() {
if pb.stop == nil {
return
}

// Signal the poll goroutine to exit
close(pb.stop)

// The event and lost channels should be drained here since the consumer
// may have stopped at this point. Failure to drain it will
// result in a deadlock: the channel will fill up and the poll
// goroutine will block in the callback.
go func() {
// revive:disable:empty-block
for range pb.eventsChan {
}

if pb.lostChan != nil {
for range pb.lostChan {
}
}
// revive:enable:empty-block
}()

// Wait for the poll goroutine to exit
pb.wg.Wait()

// Close the channel -- this is useful for the consumer but
// also to terminate the drain goroutine above.
close(pb.eventsChan)
if pb.lostChan != nil {
close(pb.lostChan)
}

// Reset pb.stop to allow multiple safe calls to Stop()
pb.stop = nil
}

func (pb *PerfBuffer) Close() {
if pb.closed {
return
}

pb.Stop()
C.perf_buffer__free(pb.pb)
eventChannels.remove(pb.slot)
pb.closed = true
}

// todo: consider writing the perf polling in go as c to go calls (callback) are expensive
func (pb *PerfBuffer) poll(timeout int) error {
defer pb.wg.Done()

for {
select {
case <-pb.stop:
return nil
default:
retC := C.perf_buffer__poll(pb.pb, C.int(timeout))
if retC < 0 {
errno := syscall.Errno(-retC)
if errno == syscall.EINTR {
continue
}

return fmt.Errorf("error polling perf buffer: %w", errno)
}
}
}
}
112 changes: 112 additions & 0 deletions buf-ring.go
@@ -0,0 +1,112 @@
package libbpfgo

/*
#cgo LDFLAGS: -lelf -lz
#include "libbpfgo.h"
*/
import "C"

import (
"fmt"
"sync"
"syscall"
)

//
// RingBuffer
//

type RingBuffer struct {
rb *C.struct_ring_buffer
bpfMap *BPFMap
slot uint
stop chan struct{}
closed bool
wg sync.WaitGroup
}

// Poll will wait until timeout in milliseconds to gather
// data from the ring buffer.
func (rb *RingBuffer) Poll(timeout int) {
rb.stop = make(chan struct{})
rb.wg.Add(1)
go rb.poll(timeout)
}

// Deprecated: use RingBuffer.Poll() instead.
func (rb *RingBuffer) Start() {
rb.Poll(300)
}

func (rb *RingBuffer) Stop() {
if rb.stop == nil {
return
}

// Signal the poll goroutine to exit
close(rb.stop)

// The event channel should be drained here since the consumer
// may have stopped at this point. Failure to drain it will
// result in a deadlock: the channel will fill up and the poll
// goroutine will block in the callback.
eventChan := eventChannels.get(rb.slot).(chan []byte)
go func() {
// revive:disable:empty-block
for range eventChan {
}
// revive:enable:empty-block
}()

// Wait for the poll goroutine to exit
rb.wg.Wait()

// Close the channel -- this is useful for the consumer but
// also to terminate the drain goroutine above.
close(eventChan)

// Reset pb.stop to allow multiple safe calls to Stop()
rb.stop = nil
}

func (rb *RingBuffer) Close() {
if rb.closed {
return
}

rb.Stop()
C.ring_buffer__free(rb.rb)
eventChannels.remove(rb.slot)
rb.closed = true
}

func (rb *RingBuffer) isStopped() bool {
select {
case <-rb.stop:
return true
default:
return false
}
}

func (rb *RingBuffer) poll(timeout int) error {
defer rb.wg.Done()

for {
retC := C.ring_buffer__poll(rb.rb, C.int(timeout))
if rb.isStopped() {
break
}

if retC < 0 {
errno := syscall.Errno(-retC)
if errno == syscall.EINTR {
continue
}

return fmt.Errorf("error polling ring buffer: %w", errno)
}
}

return nil
}
1 change: 1 addition & 0 deletions elf.go
Expand Up @@ -57,5 +57,6 @@ func isGlobalVariableSection(sectionName string) bool {
strings.HasPrefix(sectionName, ".rodata.") {
return true
}

return false
}
1 change: 1 addition & 0 deletions libbpf_cb.go
Expand Up @@ -28,6 +28,7 @@ func perfLostCallback(ctx unsafe.Pointer, cpu C.int, cnt C.ulonglong) {
func ringbufferCallback(ctx unsafe.Pointer, data unsafe.Pointer, size C.int) C.int {
ch := eventChannels.get(uint(uintptr(ctx))).(chan []byte)
ch <- C.GoBytes(data, size)

return C.int(0)
}

Expand Down