From 20cd78164737a842d564f700e8f0b70a7c5ebe15 Mon Sep 17 00:00:00 2001 From: Francisco Javier Honduvilla Coto Date: Thu, 21 Apr 2022 14:02:09 +0200 Subject: [PATCH] profiler: Batch API fixes This PR is based off [@kakkoyun's work](https://github.com/parca-dev/parca-agent/pull/326) to use libbpf(go)'s batch APIs. **Context** The main issue we found while working with this API was that they were erroring with `EPERM`. After some debugging, we realised that libbpf wasn't handle with errors in the way we expected. The debugging write-up and more context can be found [here](https://github.com/aquasecurity/libbpfgo/issues/159), and the fix is in [this PR](https://github.com/aquasecurity/libbpfgo/pull/157). After landing these changes upstream, we pointed to the updated libbpfgo version, as well as added some [regression tests](https://github.com/parca-dev/parca-agent/pull/381) to ensure that libbpfgo behaves as expected, and to make it easier in the future to write further compatibility tests. Note: the rest of the batch APIs error handling is still unfixed. Tracking in https://github.com/aquasecurity/libbpfgo/issues/162. --- Makefile | 2 +- pkg/profiler/profiler.go | 130 ++++++++++++++++++++++++--------------- 2 files changed, 81 insertions(+), 51 deletions(-) diff --git a/Makefile b/Makefile index 295b0192a6..07afd3bdea 100644 --- a/Makefile +++ b/Makefile @@ -106,7 +106,7 @@ bpf_compile_tools = $(CMD_LLC) $(CMD_CLANG) .PHONY: $(bpf_compile_tools) $(bpf_compile_tools): % : check_% -# TODO(kakkoyun): To prevent out of sync libbpf dependency, we nmight want to try directly linking/updating the submodule in the libbpf-go. +# TODO(kakkoyun): To prevent out of sync libbpf dependency, we might want to try directly linking/updating the submodule in the libbpf-go. # - Determining the location of the go module cache dir and initializing the submodule in there and linking in here, should be doable. $(LIBBPF_SRC): test -d $(LIBBPF_SRC) || (echo "missing libbpf source - maybe do 'git submodule init && git submodule update'" ; false) diff --git a/pkg/profiler/profiler.go b/pkg/profiler/profiler.go index 48c0c2d0f0..af72b4dbaf 100644 --- a/pkg/profiler/profiler.go +++ b/pkg/profiler/profiler.go @@ -80,7 +80,18 @@ type stackCountKey struct { type bpfMaps struct { counts *bpf.BPFMap - traces *bpf.BPFMap + stacks *bpf.BPFMap +} + +func (m bpfMaps) clean(stacks []int32, logger log.Logger) { + for _, stackId := range stacks { + err := m.stacks.DeleteKey(unsafe.Pointer(&stackId)) + if err != nil { + if !errors.Is(err, syscall.ENOENT) { + level.Debug(logger).Log("msg", "failed to delete stack trace", "errno", err) + } + } + } } type metrics struct { @@ -246,6 +257,11 @@ func (p *CgroupProfiler) Run(ctx context.Context) error { ctx, p.cancel = context.WithCancel(ctx) p.mtx.Unlock() + // @nocommit, enabling this here to get the right error codes. + // once the changes land upstream, we don't have to enable it + // (javierhonduco: explain this better). + // bpf.SetStrictMode(bpf.LibbpfStrictModeDirectErrs) + m, err := bpf.NewModuleFromBufferArgs(bpf.NewModuleArgs{ BPFObjBuff: bpfObj, BPFObjName: "parca", @@ -317,11 +333,11 @@ func (p *CgroupProfiler) Run(ctx context.Context) error { return fmt.Errorf("get counts map: %w", err) } - traces, err := m.GetMap("stack_traces") + stacks, err := m.GetMap("stack_traces") if err != nil { return fmt.Errorf("get stack traces map: %w", err) } - p.bpfMaps = &bpfMaps{counts: counts, traces: traces} + p.bpfMaps = &bpfMaps{counts: counts, stacks: stacks} // Allocate this here, so it's only allocated once instead of every // time that p.profileLoop is called below. This is because, as of now, @@ -376,52 +392,41 @@ func (p *CgroupProfiler) profileLoop(ctx context.Context, captureTime time.Time) // Variables needed for eBPF map batch iteration. countKeysPtr = unsafe.Pointer(&p.countKeys[0]) - nextCountKey = uintptr(1) + nextCountKey = uintptr(0) ) // Reset count keys before collecting new traces from the kernel. memsetCountKeys(p.countKeys, stackCountKey{}) - batchSize := 0 - it := p.bpfMaps.counts.Iterator() - for it.Next() { - batchSize++ - } - if err := it.Err(); err != nil { - return fmt.Errorf("iterate over counts map: %w", err) - } - - if batchSize == 0 { - return nil - } - level.Debug(p.logger).Log("msg", "fetching stack trace counts in batch", "batchSize", batchSize) - time.Sleep(1 * time.Second) - var ( values [][]byte err error ) - values, err = p.bpfMaps.counts.GetValueAndDeleteBatch(countKeysPtr, nil, unsafe.Pointer(&nextCountKey), uint32(batchSize)) + + batchSize := p.bpfMaps.counts.GetMaxEntries() + level.Debug(p.logger).Log("msg", "fetching stack trace counts in batch", "batchSize", batchSize) + + values, err = p.bpfMaps.counts.GetValueAndDeleteBatch(countKeysPtr, nil, unsafe.Pointer(&nextCountKey), batchSize) + processedCount := len(values) + if err != nil { - switch { - case errors.Is(err, syscall.EPERM): - level.Error(p.logger).Log("msg", "get value and delete batch: requested number of items is probably greater than existed", "err", err) - // return fmt.Errorf("get value and delete batch: requested number of items is probably greater than existed: %w", err) - return nil - - case errors.Is(err, syscall.ENOENT): - level.Debug(p.logger).Log("msg", "no values in batch") - return nil - - default: - return fmt.Errorf("get value and delete batch: %w", err) - } + level.Error(p.logger).Log("msg", "get value and delete batch failed with", "err", err) + return nil } - if len(values) == 0 { - level.Debug(p.logger).Log("msg", "no values in batch") + + if processedCount == 0 { + level.Error(p.logger).Log("msg", "no values in batch") return nil } + if nextCountKey != uintptr(0) { + level.Debug(p.logger).Log("msg", "Next batch should be null", "nextBatch", nextCountKey) + } + + level.Debug(p.logger).Log("msg", "get value and delete batch", "batchSize", batchSize, "processedCount", processedCount) + + stacksKeys := make(map[int32]bool, processedCount) + for i, key := range p.countKeys { var ( pid = key.PID @@ -433,23 +438,32 @@ func (p *CgroupProfiler) profileLoop(ctx context.Context, captureTime time.Time) continue } + // Don't go over more stacks than we've fetched. + if i > processedCount { + break + } + // Twice the stack depth because we have a user and a potential Kernel stack. // Read order matters, since we read from the key buffer. stack := stack{} - userErr := p.getAndDeleteUserStack(userStackID, &stack) + userErr := p.readUserStack(userStackID, &stack) if userErr != nil { if errors.Is(userErr, errUnrecoverable) { return userErr } level.Debug(p.logger).Log("msg", "failed to read user stack", "err", userErr) } - kernelErr := p.getAndDeleteKernelStack(kernelStackID, &stack) + stacksKeys[userStackID] = true + + kernelErr := p.readKernelStack(kernelStackID, &stack) if kernelErr != nil { if errors.Is(kernelErr, errUnrecoverable) { return kernelErr } level.Debug(p.logger).Log("msg", "failed to read kernel stack", "err", kernelErr) } + stacksKeys[kernelStackID] = true + if userErr != nil && kernelErr != nil { // Both stacks are missing. Nothing to do. continue @@ -529,6 +543,29 @@ func (p *CgroupProfiler) profileLoop(ctx context.Context, captureTime time.Time) samples[stack] = sample } + // Delete all stacks. + // + // The capacity will be difficult to estimate without counting as it's + // likely that there will be more than we need due to duplicated stack IDs. + stacksKeysVector := make([]int32, 0, len(stacksKeys)/2) + for key, _ := range stacksKeys { + stacksKeysVector = append(stacksKeysVector, key) + } + + // TODO(javierhonduco): Getting -ENOTSUPP, perhaps my kernel doesn't support it? + // Needs more investigation. + // + // processed, err := p.bpfMaps.stacks.DeleteKeyBatch(unsafe.Pointer(&stacksKeysVector[0]), uint32(len(stacksKeysVector))) + // level.Debug(p.logger).Log("msg", "deleted counts map in batch", "deleted", processed) + // if err != nil { + // return fmt.Errorf("failed to delete stacks in batch: %w", err) + // + // } + + // Remove the stacktraces one by one. We need to do it at the end as we might + // be deleting entries we need in subsequent iterations. + p.bpfMaps.clean(stacksKeysVector, p.logger) + prof, err := p.buildProfile(ctx, captureTime, samples, locations, kernelLocations, userLocations, mappings, kernelMapping) if err != nil { return fmt.Errorf("failed to build profile: %w", err) @@ -684,14 +721,14 @@ func (p *CgroupProfiler) resolveKernelFunctions(kernelLocations []*profile.Locat return kernelFunctions, nil } -// getAndDeleteUserStack reads the user stack trace from the stacktraces ebpf map into the given buffer and deletes it. -func (p *CgroupProfiler) getAndDeleteUserStack(userStackID int32, stack *stack) error { +// readUserStack reads the user stack trace from the stacktraces ebpf map into the given buffer and deletes it. +func (p *CgroupProfiler) readUserStack(userStackID int32, stack *stack) error { if userStackID == 0 { p.metrics.failedStackUnwindingAttempts.WithLabelValues("user").Inc() return errors.New("user stack ID is 0, probably stack unwinding failed") } - stackBytes, err := p.bpfMaps.traces.GetValue(unsafe.Pointer(&userStackID)) + stackBytes, err := p.bpfMaps.stacks.GetValue(unsafe.Pointer(&userStackID)) if err != nil { p.metrics.missingStacks.WithLabelValues("user").Inc() return fmt.Errorf("read user stack trace: %w", err) @@ -701,21 +738,17 @@ func (p *CgroupProfiler) getAndDeleteUserStack(userStackID int32, stack *stack) return fmt.Errorf("read user stack bytes, %s: %w", err, errUnrecoverable) } - if err := p.bpfMaps.traces.DeleteKey(unsafe.Pointer(&userStackID)); err != nil { - return fmt.Errorf("unable to delete stack trace key: %w", err) - } - return nil } -// getAndDeleteKernelStack reads the kernel stack trace from the stacktraces ebpf map into the given buffer and deletes it. -func (p *CgroupProfiler) getAndDeleteKernelStack(kernelStackID int32, stack *stack) error { +// readKernelStack reads the kernel stack trace from the stacktraces ebpf map into the given buffer and deletes it. +func (p *CgroupProfiler) readKernelStack(kernelStackID int32, stack *stack) error { if kernelStackID == 0 { p.metrics.failedStackUnwindingAttempts.WithLabelValues("kernel").Inc() return errors.New("kernel stack ID is 0, probably stack unwinding failed") } - stackBytes, err := p.bpfMaps.traces.GetValue(unsafe.Pointer(&kernelStackID)) + stackBytes, err := p.bpfMaps.stacks.GetValue(unsafe.Pointer(&kernelStackID)) if err != nil { p.metrics.missingStacks.WithLabelValues("kernel").Inc() return fmt.Errorf("read kernel stack trace: %w", err) @@ -725,9 +758,6 @@ func (p *CgroupProfiler) getAndDeleteKernelStack(kernelStackID int32, stack *sta return fmt.Errorf("read kernel stack bytes, %s: %w", err, errUnrecoverable) } - if err := p.bpfMaps.traces.DeleteKey(unsafe.Pointer(&kernelStackID)); err != nil { - return fmt.Errorf("unable to delete stack trace key: %w", err) - } return nil }