From 227837aa19cd94d9b9f531c1f5c81f42a6502097 Mon Sep 17 00:00:00 2001 From: Kemal Akkoyun Date: Thu, 5 May 2022 16:11:03 +0200 Subject: [PATCH] Utilize libbpf batch APIs Signed-off-by: Kemal Akkoyun --- .dockerignore | 1 + Makefile | 2 + pkg/agent/write_client.go | 2 +- pkg/profiler/profiler.go | 209 +++++++++++++++++++------------------- 4 files changed, 111 insertions(+), 103 deletions(-) diff --git a/.dockerignore b/.dockerignore index 9987431760..d2a3115c6f 100644 --- a/.dockerignore +++ b/.dockerignore @@ -6,3 +6,4 @@ Dockerfile /images /.github /tmp +*.bpf.o diff --git a/Makefile b/Makefile index 837b133d62..295b0192a6 100644 --- a/Makefile +++ b/Makefile @@ -106,6 +106,8 @@ bpf_compile_tools = $(CMD_LLC) $(CMD_CLANG) .PHONY: $(bpf_compile_tools) $(bpf_compile_tools): % : check_% +# TODO(kakkoyun): To prevent out of sync libbpf dependency, we nmight want to try directly linking/updating the submodule in the libbpf-go. +# - Determining the location of the go module cache dir and initializing the submodule in there and linking in here, should be doable. $(LIBBPF_SRC): test -d $(LIBBPF_SRC) || (echo "missing libbpf source - maybe do 'git submodule init && git submodule update'" ; false) diff --git a/pkg/agent/write_client.go b/pkg/agent/write_client.go index ef257e84f0..441588dad8 100644 --- a/pkg/agent/write_client.go +++ b/pkg/agent/write_client.go @@ -105,7 +105,7 @@ func (b *Batcher) batchLoop(ctx context.Context) error { } if len(batch) > 0 { - level.Debug(b.logger).Log("msg", "batch write client sent profiles", "count", len(batch)) + level.Debug(b.logger).Log("msg", "batch write client has sent profiles", "count", len(batch)) } return nil } diff --git a/pkg/profiler/profiler.go b/pkg/profiler/profiler.go index 7e04fa0366..48c0c2d0f0 100644 --- a/pkg/profiler/profiler.go +++ b/pkg/profiler/profiler.go @@ -66,11 +66,6 @@ const ( type stack [doubleStackDepth]uint64 -type bpfMaps struct { - counts *bpf.BPFMap - stackTraces *bpf.BPFMap -} - // stackCountKey mirrors the struct in parca-agent.bpf.c // NOTICE: The memory layout and alignment of the struct currently matches the struct in parca-agent.bpf.c. // However, keep in mind that Go compiler injects padding to align the struct fields to be a multiple of 8 bytes. @@ -83,54 +78,9 @@ type stackCountKey struct { KernelStackID int32 } -func (m bpfMaps) clean() error { - // BPF iterators need the previous value to iterate to the next, so we - // can only delete the "previous" item once we've already iterated to - // the next. - - it := m.stackTraces.Iterator() - var prev []byte = nil - for it.Next() { - if prev != nil { - err := m.stackTraces.DeleteKey(unsafe.Pointer(&prev[0])) - if err != nil { - return fmt.Errorf("failed to delete stack trace: %w", err) - } - } - - key := it.Key() - prev = make([]byte, len(key)) - copy(prev, key) - } - if prev != nil { - err := m.stackTraces.DeleteKey(unsafe.Pointer(&prev[0])) - if err != nil { - return fmt.Errorf("failed to delete stack trace: %w", err) - } - } - - it = m.counts.Iterator() - prev = nil - for it.Next() { - if prev != nil { - err := m.counts.DeleteKey(unsafe.Pointer(&prev[0])) - if err != nil { - return fmt.Errorf("failed to delete count: %w", err) - } - } - - key := it.Key() - prev = make([]byte, len(key)) - copy(prev, key) - } - if prev != nil { - err := m.counts.DeleteKey(unsafe.Pointer(&prev[0])) - if err != nil { - return fmt.Errorf("failed to delete count: %w", err) - } - } - - return nil +type bpfMaps struct { + counts *bpf.BPFMap + traces *bpf.BPFMap } type metrics struct { @@ -199,6 +149,7 @@ type CgroupProfiler struct { bpfMaps *bpfMaps byteOrder binary.ByteOrder + countKeys []stackCountKey lastError error lastProfileTakenAt time.Time @@ -366,11 +317,20 @@ func (p *CgroupProfiler) Run(ctx context.Context) error { return fmt.Errorf("get counts map: %w", err) } - stackTraces, err := m.GetMap("stack_traces") + traces, err := m.GetMap("stack_traces") if err != nil { return fmt.Errorf("get stack traces map: %w", err) } - p.bpfMaps = &bpfMaps{counts: counts, stackTraces: stackTraces} + p.bpfMaps = &bpfMaps{counts: counts, traces: traces} + + // Allocate this here, so it's only allocated once instead of every + // time that p.profileLoop is called below. This is because, as of now, + // this slice will be around 122Kb. We allocate enough to read the entire + // map instead of using the batch iteration feature because it vastly + // simplifies the code in profileLoop and the batch operations are a bit tricky to get right. + // If allocating this much memory upfront is a problem we can always revisit and use + // smaller batch sizes. + p.countKeys = make([]stackCountKey, counts.GetMaxEntries()) ticker := time.NewTicker(p.profilingDuration) defer ticker.Stop() @@ -393,7 +353,15 @@ func (p *CgroupProfiler) Run(ctx context.Context) error { } } -func (p *CgroupProfiler) profileLoop(ctx context.Context, captureTime time.Time) (err error) { +func (p *CgroupProfiler) loopReport(lastProfileTakenAt time.Time, lastError error) { + p.mtx.Lock() + defer p.mtx.Unlock() + + p.lastProfileTakenAt = lastProfileTakenAt + p.lastError = lastError +} + +func (p *CgroupProfiler) profileLoop(ctx context.Context, captureTime time.Time) error { var ( mappings = maps.NewMapping(p.pidMappingFileCache) kernelMapping = &profile.Mapping{ @@ -405,33 +373,77 @@ func (p *CgroupProfiler) profileLoop(ctx context.Context, captureTime time.Time) kernelLocations = []*profile.Location{} userLocations = map[uint32][]*profile.Location{} // PID -> []*profile.Location locationIndices = map[[2]uint64]int{} // [PID, Address] -> index in locations + + // Variables needed for eBPF map batch iteration. + countKeysPtr = unsafe.Pointer(&p.countKeys[0]) + nextCountKey = uintptr(1) ) - // TODO(kakkoyun): Use libbpf batch functions. + // Reset count keys before collecting new traces from the kernel. + memsetCountKeys(p.countKeys, stackCountKey{}) + + batchSize := 0 it := p.bpfMaps.counts.Iterator() for it.Next() { - // This byte slice is only valid for this iteration, so it must be - // copied if we want to do anything with it outside this loop. - keyBytes := it.Key() - - var key stackCountKey - // NOTICE: This works because the key struct in Go and the key struct in C has exactly the same memory layout. - // See the comment in stackCountKey for more details. - if err := binary.Read(bytes.NewBuffer(keyBytes), p.byteOrder, &key); err != nil { - return fmt.Errorf("read stack count key: %w", err) + batchSize++ + } + if err := it.Err(); err != nil { + return fmt.Errorf("iterate over counts map: %w", err) + } + + if batchSize == 0 { + return nil + } + level.Debug(p.logger).Log("msg", "fetching stack trace counts in batch", "batchSize", batchSize) + time.Sleep(1 * time.Second) + + var ( + values [][]byte + err error + ) + values, err = p.bpfMaps.counts.GetValueAndDeleteBatch(countKeysPtr, nil, unsafe.Pointer(&nextCountKey), uint32(batchSize)) + if err != nil { + switch { + case errors.Is(err, syscall.EPERM): + level.Error(p.logger).Log("msg", "get value and delete batch: requested number of items is probably greater than existed", "err", err) + // return fmt.Errorf("get value and delete batch: requested number of items is probably greater than existed: %w", err) + return nil + + case errors.Is(err, syscall.ENOENT): + level.Debug(p.logger).Log("msg", "no values in batch") + return nil + + default: + return fmt.Errorf("get value and delete batch: %w", err) + } + } + if len(values) == 0 { + level.Debug(p.logger).Log("msg", "no values in batch") + return nil + } + + for i, key := range p.countKeys { + var ( + pid = key.PID + userStackID = key.UserStackID + kernelStackID = key.KernelStackID + ) + + if pid == 0 { + continue } // Twice the stack depth because we have a user and a potential Kernel stack. // Read order matters, since we read from the key buffer. stack := stack{} - userErr := p.readUserStack(key.UserStackID, &stack) + userErr := p.getAndDeleteUserStack(userStackID, &stack) if userErr != nil { if errors.Is(userErr, errUnrecoverable) { return userErr } level.Debug(p.logger).Log("msg", "failed to read user stack", "err", userErr) } - kernelErr := p.readKernelStack(key.KernelStackID, &stack) + kernelErr := p.getAndDeleteKernelStack(kernelStackID, &stack) if kernelErr != nil { if errors.Is(kernelErr, errUnrecoverable) { return kernelErr @@ -443,10 +455,7 @@ func (p *CgroupProfiler) profileLoop(ctx context.Context, captureTime time.Time) continue } - value, err := p.readValue(keyBytes) - if err != nil { - return fmt.Errorf("read value: %w", err) - } + value := p.byteOrder.Uint64(values[i]) if value == 0 { // This should never happen, but it's here just in case. // If we have a zero value, we don't want to add it to the profile. @@ -519,9 +528,6 @@ func (p *CgroupProfiler) profileLoop(ctx context.Context, captureTime time.Time) } samples[stack] = sample } - if it.Err() != nil { - return fmt.Errorf("failed iterator: %w", it.Err()) - } prof, err := p.buildProfile(ctx, captureTime, samples, locations, kernelLocations, userLocations, mappings, kernelMapping) if err != nil { @@ -532,10 +538,6 @@ func (p *CgroupProfiler) profileLoop(ctx context.Context, captureTime time.Time) level.Error(p.logger).Log("msg", "failed to send profile", "err", err) } - if err := p.bpfMaps.clean(); err != nil { - level.Warn(p.logger).Log("msg", "failed to clean BPF maps", "err", err) - } - ksymCacheStats := p.ksymCache.Stats level.Debug(p.logger).Log("msg", "Kernel symbol cache stats", "stats", ksymCacheStats.String()) p.metrics.ksymCacheHitRate.WithLabelValues("hits").Add(float64(ksymCacheStats.Hits)) @@ -544,14 +546,6 @@ func (p *CgroupProfiler) profileLoop(ctx context.Context, captureTime time.Time) return nil } -func (p *CgroupProfiler) loopReport(lastProfileTakenAt time.Time, lastError error) { - p.mtx.Lock() - defer p.mtx.Unlock() - - p.lastProfileTakenAt = lastProfileTakenAt - p.lastError = lastError -} - func (p *CgroupProfiler) buildProfile( ctx context.Context, captureTime time.Time, @@ -690,14 +684,14 @@ func (p *CgroupProfiler) resolveKernelFunctions(kernelLocations []*profile.Locat return kernelFunctions, nil } -// readUserStack reads the user stack trace from the stacktraces ebpf map into the given buffer. -func (p *CgroupProfiler) readUserStack(userStackID int32, stack *stack) error { +// getAndDeleteUserStack reads the user stack trace from the stacktraces ebpf map into the given buffer and deletes it. +func (p *CgroupProfiler) getAndDeleteUserStack(userStackID int32, stack *stack) error { if userStackID == 0 { p.metrics.failedStackUnwindingAttempts.WithLabelValues("user").Inc() return errors.New("user stack ID is 0, probably stack unwinding failed") } - stackBytes, err := p.bpfMaps.stackTraces.GetValue(unsafe.Pointer(&userStackID)) + stackBytes, err := p.bpfMaps.traces.GetValue(unsafe.Pointer(&userStackID)) if err != nil { p.metrics.missingStacks.WithLabelValues("user").Inc() return fmt.Errorf("read user stack trace: %w", err) @@ -707,17 +701,21 @@ func (p *CgroupProfiler) readUserStack(userStackID int32, stack *stack) error { return fmt.Errorf("read user stack bytes, %s: %w", err, errUnrecoverable) } + if err := p.bpfMaps.traces.DeleteKey(unsafe.Pointer(&userStackID)); err != nil { + return fmt.Errorf("unable to delete stack trace key: %w", err) + } + return nil } -// readKernelStack reads the kernel stack trace from the stacktraces ebpf map into the given buffer. -func (p *CgroupProfiler) readKernelStack(kernelStackID int32, stack *stack) error { +// getAndDeleteKernelStack reads the kernel stack trace from the stacktraces ebpf map into the given buffer and deletes it. +func (p *CgroupProfiler) getAndDeleteKernelStack(kernelStackID int32, stack *stack) error { if kernelStackID == 0 { p.metrics.failedStackUnwindingAttempts.WithLabelValues("kernel").Inc() return errors.New("kernel stack ID is 0, probably stack unwinding failed") } - stackBytes, err := p.bpfMaps.stackTraces.GetValue(unsafe.Pointer(&kernelStackID)) + stackBytes, err := p.bpfMaps.traces.GetValue(unsafe.Pointer(&kernelStackID)) if err != nil { p.metrics.missingStacks.WithLabelValues("kernel").Inc() return fmt.Errorf("read kernel stack trace: %w", err) @@ -727,16 +725,10 @@ func (p *CgroupProfiler) readKernelStack(kernelStackID int32, stack *stack) erro return fmt.Errorf("read kernel stack bytes, %s: %w", err, errUnrecoverable) } - return nil -} - -// readValue reads the value of the given key from the counts ebpf map. -func (p *CgroupProfiler) readValue(keyBytes []byte) (uint64, error) { - valueBytes, err := p.bpfMaps.counts.GetValue(unsafe.Pointer(&keyBytes[0])) - if err != nil { - return 0, fmt.Errorf("get count value: %w", err) + if err := p.bpfMaps.traces.DeleteKey(unsafe.Pointer(&kernelStackID)); err != nil { + return fmt.Errorf("unable to delete stack trace key: %w", err) } - return p.byteOrder.Uint64(valueBytes), nil + return nil } // normalizeProfile calculates the base addresses of a position-independent binary and normalizes captured locations accordingly. @@ -827,3 +819,16 @@ func (p *CgroupProfiler) bumpMemlockRlimit() error { return nil } + +// memsetCountKeys will reset the given slice to the given value. +// This function makes use of the highly optimized copy builtin function +// and is able to fill the entire slice in O(log n) time. +func memsetCountKeys(in []stackCountKey, v stackCountKey) { + if len(in) == 0 { + return + } + in[0] = v + for bp := 1; bp < len(in); bp *= 2 { + copy(in[bp:], in[:bp]) + } +}