Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

profiler: Use BPF batch operations #326

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions .dockerignore
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,4 @@ Dockerfile
/images
/.github
/tmp
*.bpf.o
2 changes: 2 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,8 @@ bpf_compile_tools = $(CMD_LLC) $(CMD_CLANG)
.PHONY: $(bpf_compile_tools)
$(bpf_compile_tools): % : check_%

# TODO(kakkoyun): To prevent out of sync libbpf dependency, we nmight want to try directly linking/updating the submodule in the libbpf-go.
# - Determining the location of the go module cache dir and initializing the submodule in there and linking in here, should be doable.
$(LIBBPF_SRC):
test -d $(LIBBPF_SRC) || (echo "missing libbpf source - maybe do 'git submodule init && git submodule update'" ; false)

Expand Down
209 changes: 107 additions & 102 deletions pkg/profiler/profiler.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,11 +66,6 @@ const (

type stack [doubleStackDepth]uint64

type bpfMaps struct {
counts *bpf.BPFMap
stackTraces *bpf.BPFMap
}

// stackCountKey mirrors the struct in parca-agent.bpf.c
// NOTICE: The memory layout and alignment of the struct currently matches the struct in parca-agent.bpf.c.
// However, keep in mind that Go compiler injects padding to align the struct fields to be a multiple of 8 bytes.
Expand All @@ -83,54 +78,9 @@ type stackCountKey struct {
KernelStackID int32
}

func (m bpfMaps) clean() error {
// BPF iterators need the previous value to iterate to the next, so we
// can only delete the "previous" item once we've already iterated to
// the next.

it := m.stackTraces.Iterator()
var prev []byte = nil
for it.Next() {
if prev != nil {
err := m.stackTraces.DeleteKey(unsafe.Pointer(&prev[0]))
if err != nil {
return fmt.Errorf("failed to delete stack trace: %w", err)
}
}

key := it.Key()
prev = make([]byte, len(key))
copy(prev, key)
}
if prev != nil {
err := m.stackTraces.DeleteKey(unsafe.Pointer(&prev[0]))
if err != nil {
return fmt.Errorf("failed to delete stack trace: %w", err)
}
}

it = m.counts.Iterator()
prev = nil
for it.Next() {
if prev != nil {
err := m.counts.DeleteKey(unsafe.Pointer(&prev[0]))
if err != nil {
return fmt.Errorf("failed to delete count: %w", err)
}
}

key := it.Key()
prev = make([]byte, len(key))
copy(prev, key)
}
if prev != nil {
err := m.counts.DeleteKey(unsafe.Pointer(&prev[0]))
if err != nil {
return fmt.Errorf("failed to delete count: %w", err)
}
}

return nil
type bpfMaps struct {
counts *bpf.BPFMap
traces *bpf.BPFMap
}

type metrics struct {
Expand Down Expand Up @@ -199,6 +149,7 @@ type CgroupProfiler struct {

bpfMaps *bpfMaps
byteOrder binary.ByteOrder
countKeys []stackCountKey

lastError error
lastProfileTakenAt time.Time
Expand Down Expand Up @@ -366,11 +317,20 @@ func (p *CgroupProfiler) Run(ctx context.Context) error {
return fmt.Errorf("get counts map: %w", err)
}

stackTraces, err := m.GetMap("stack_traces")
traces, err := m.GetMap("stack_traces")
if err != nil {
return fmt.Errorf("get stack traces map: %w", err)
}
p.bpfMaps = &bpfMaps{counts: counts, stackTraces: stackTraces}
p.bpfMaps = &bpfMaps{counts: counts, traces: traces}

// Allocate this here, so it's only allocated once instead of every
// time that p.profileLoop is called below. This is because, as of now,
// this slice will be around 122Kb. We allocate enough to read the entire
// map instead of using the batch iteration feature because it vastly
// simplifies the code in profileLoop and the batch operations are a bit tricky to get right.
// If allocating this much memory upfront is a problem we can always revisit and use
// smaller batch sizes.
p.countKeys = make([]stackCountKey, counts.GetMaxEntries())

ticker := time.NewTicker(p.profilingDuration)
defer ticker.Stop()
Expand All @@ -393,7 +353,15 @@ func (p *CgroupProfiler) Run(ctx context.Context) error {
}
}

func (p *CgroupProfiler) profileLoop(ctx context.Context, captureTime time.Time) (err error) {
func (p *CgroupProfiler) loopReport(lastProfileTakenAt time.Time, lastError error) {
p.mtx.Lock()
defer p.mtx.Unlock()

p.lastProfileTakenAt = lastProfileTakenAt
p.lastError = lastError
}

func (p *CgroupProfiler) profileLoop(ctx context.Context, captureTime time.Time) error {
var (
mappings = maps.NewMapping(p.pidMappingFileCache)
kernelMapping = &profile.Mapping{
Expand All @@ -405,33 +373,77 @@ func (p *CgroupProfiler) profileLoop(ctx context.Context, captureTime time.Time)
kernelLocations = []*profile.Location{}
userLocations = map[uint32][]*profile.Location{} // PID -> []*profile.Location
locationIndices = map[[2]uint64]int{} // [PID, Address] -> index in locations

// Variables needed for eBPF map batch iteration.
countKeysPtr = unsafe.Pointer(&p.countKeys[0])
nextCountKey = uintptr(1)
)

// TODO(kakkoyun): Use libbpf batch functions.
// Reset count keys before collecting new traces from the kernel.
memsetCountKeys(p.countKeys, stackCountKey{})

batchSize := 0
it := p.bpfMaps.counts.Iterator()
for it.Next() {
// This byte slice is only valid for this iteration, so it must be
// copied if we want to do anything with it outside this loop.
keyBytes := it.Key()

var key stackCountKey
// NOTICE: This works because the key struct in Go and the key struct in C has exactly the same memory layout.
// See the comment in stackCountKey for more details.
if err := binary.Read(bytes.NewBuffer(keyBytes), p.byteOrder, &key); err != nil {
return fmt.Errorf("read stack count key: %w", err)
batchSize++
}
if err := it.Err(); err != nil {
return fmt.Errorf("iterate over counts map: %w", err)
}

if batchSize == 0 {
return nil
}
level.Debug(p.logger).Log("msg", "fetching stack trace counts in batch", "batchSize", batchSize)
time.Sleep(1 * time.Second)
kakkoyun marked this conversation as resolved.
Show resolved Hide resolved

var (
values [][]byte
err error
)
values, err = p.bpfMaps.counts.GetValueAndDeleteBatch(countKeysPtr, nil, unsafe.Pointer(&nextCountKey), uint32(batchSize))
if err != nil {
switch {
case errors.Is(err, syscall.EPERM):
level.Error(p.logger).Log("msg", "get value and delete batch: requested number of items is probably greater than existed", "err", err)
// return fmt.Errorf("get value and delete batch: requested number of items is probably greater than existed: %w", err)
return nil

case errors.Is(err, syscall.ENOENT):
level.Debug(p.logger).Log("msg", "no values in batch")
return nil

default:
return fmt.Errorf("get value and delete batch: %w", err)
}
}
if len(values) == 0 {
level.Debug(p.logger).Log("msg", "no values in batch")
return nil
}

for i, key := range p.countKeys {
var (
pid = key.PID
userStackID = key.UserStackID
kernelStackID = key.KernelStackID
)

if pid == 0 {
continue
}

// Twice the stack depth because we have a user and a potential Kernel stack.
// Read order matters, since we read from the key buffer.
stack := stack{}
userErr := p.readUserStack(key.UserStackID, &stack)
userErr := p.getAndDeleteUserStack(userStackID, &stack)
if userErr != nil {
if errors.Is(userErr, errUnrecoverable) {
return userErr
}
level.Debug(p.logger).Log("msg", "failed to read user stack", "err", userErr)
}
kernelErr := p.readKernelStack(key.KernelStackID, &stack)
kernelErr := p.getAndDeleteKernelStack(kernelStackID, &stack)
if kernelErr != nil {
if errors.Is(kernelErr, errUnrecoverable) {
return kernelErr
Expand All @@ -443,10 +455,7 @@ func (p *CgroupProfiler) profileLoop(ctx context.Context, captureTime time.Time)
continue
}

value, err := p.readValue(keyBytes)
if err != nil {
return fmt.Errorf("read value: %w", err)
}
value := p.byteOrder.Uint64(values[i])
if value == 0 {
// This should never happen, but it's here just in case.
// If we have a zero value, we don't want to add it to the profile.
Expand Down Expand Up @@ -519,9 +528,6 @@ func (p *CgroupProfiler) profileLoop(ctx context.Context, captureTime time.Time)
}
samples[stack] = sample
}
if it.Err() != nil {
return fmt.Errorf("failed iterator: %w", it.Err())
}

prof, err := p.buildProfile(ctx, captureTime, samples, locations, kernelLocations, userLocations, mappings, kernelMapping)
if err != nil {
Expand All @@ -532,10 +538,6 @@ func (p *CgroupProfiler) profileLoop(ctx context.Context, captureTime time.Time)
level.Error(p.logger).Log("msg", "failed to send profile", "err", err)
}

if err := p.bpfMaps.clean(); err != nil {
level.Warn(p.logger).Log("msg", "failed to clean BPF maps", "err", err)
}

ksymCacheStats := p.ksymCache.Stats
level.Debug(p.logger).Log("msg", "Kernel symbol cache stats", "stats", ksymCacheStats.String())
p.metrics.ksymCacheHitRate.WithLabelValues("hits").Add(float64(ksymCacheStats.Hits))
Expand All @@ -544,14 +546,6 @@ func (p *CgroupProfiler) profileLoop(ctx context.Context, captureTime time.Time)
return nil
}

func (p *CgroupProfiler) loopReport(lastProfileTakenAt time.Time, lastError error) {
p.mtx.Lock()
defer p.mtx.Unlock()

p.lastProfileTakenAt = lastProfileTakenAt
p.lastError = lastError
}

func (p *CgroupProfiler) buildProfile(
ctx context.Context,
captureTime time.Time,
Expand Down Expand Up @@ -690,14 +684,14 @@ func (p *CgroupProfiler) resolveKernelFunctions(kernelLocations []*profile.Locat
return kernelFunctions, nil
}

// readUserStack reads the user stack trace from the stacktraces ebpf map into the given buffer.
func (p *CgroupProfiler) readUserStack(userStackID int32, stack *stack) error {
// getAndDeleteUserStack reads the user stack trace from the stacktraces ebpf map into the given buffer and deletes it.
func (p *CgroupProfiler) getAndDeleteUserStack(userStackID int32, stack *stack) error {
if userStackID == 0 {
p.metrics.failedStackUnwindingAttempts.WithLabelValues("user").Inc()
return errors.New("user stack ID is 0, probably stack unwinding failed")
}

stackBytes, err := p.bpfMaps.stackTraces.GetValue(unsafe.Pointer(&userStackID))
stackBytes, err := p.bpfMaps.traces.GetValue(unsafe.Pointer(&userStackID))
if err != nil {
p.metrics.missingStacks.WithLabelValues("user").Inc()
return fmt.Errorf("read user stack trace: %w", err)
Expand All @@ -707,17 +701,21 @@ func (p *CgroupProfiler) readUserStack(userStackID int32, stack *stack) error {
return fmt.Errorf("read user stack bytes, %s: %w", err, errUnrecoverable)
}

if err := p.bpfMaps.traces.DeleteKey(unsafe.Pointer(&userStackID)); err != nil {
return fmt.Errorf("unable to delete stack trace key: %w", err)
}

return nil
}

// readKernelStack reads the kernel stack trace from the stacktraces ebpf map into the given buffer.
func (p *CgroupProfiler) readKernelStack(kernelStackID int32, stack *stack) error {
// getAndDeleteKernelStack reads the kernel stack trace from the stacktraces ebpf map into the given buffer and deletes it.
func (p *CgroupProfiler) getAndDeleteKernelStack(kernelStackID int32, stack *stack) error {
if kernelStackID == 0 {
p.metrics.failedStackUnwindingAttempts.WithLabelValues("kernel").Inc()
return errors.New("kernel stack ID is 0, probably stack unwinding failed")
}

stackBytes, err := p.bpfMaps.stackTraces.GetValue(unsafe.Pointer(&kernelStackID))
stackBytes, err := p.bpfMaps.traces.GetValue(unsafe.Pointer(&kernelStackID))
if err != nil {
p.metrics.missingStacks.WithLabelValues("kernel").Inc()
return fmt.Errorf("read kernel stack trace: %w", err)
Expand All @@ -727,16 +725,10 @@ func (p *CgroupProfiler) readKernelStack(kernelStackID int32, stack *stack) erro
return fmt.Errorf("read kernel stack bytes, %s: %w", err, errUnrecoverable)
}

return nil
}

// readValue reads the value of the given key from the counts ebpf map.
func (p *CgroupProfiler) readValue(keyBytes []byte) (uint64, error) {
valueBytes, err := p.bpfMaps.counts.GetValue(unsafe.Pointer(&keyBytes[0]))
if err != nil {
return 0, fmt.Errorf("get count value: %w", err)
if err := p.bpfMaps.traces.DeleteKey(unsafe.Pointer(&kernelStackID)); err != nil {
return fmt.Errorf("unable to delete stack trace key: %w", err)
}
return p.byteOrder.Uint64(valueBytes), nil
return nil
}

// normalizeProfile calculates the base addresses of a position-independent binary and normalizes captured locations accordingly.
Expand Down Expand Up @@ -827,3 +819,16 @@ func (p *CgroupProfiler) bumpMemlockRlimit() error {

return nil
}

// memsetCountKeys will reset the given slice to the given value.
// This function makes use of the highly optimized copy builtin function
// and is able to fill the entire slice in O(log n) time.
func memsetCountKeys(in []stackCountKey, v stackCountKey) {
if len(in) == 0 {
return
}
in[0] = v
for bp := 1; bp < len(in); bp *= 2 {
copy(in[bp:], in[:bp])
}
}