Skip to content

Commit

Permalink
Merge pull request #394 from parca-dev/ebpf_batch
Browse files Browse the repository at this point in the history
profiler: Use BPF batch operations
  • Loading branch information
kakkoyun committed May 13, 2022
2 parents 4c45b9f + 6d8c316 commit b29cdb4
Show file tree
Hide file tree
Showing 5 changed files with 135 additions and 97 deletions.
1 change: 1 addition & 0 deletions .dockerignore
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,4 @@ Dockerfile
/images
/.github
/tmp
*.bpf.o
2 changes: 1 addition & 1 deletion .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ jobs:
run: sudo apt-get install llvm libelf-dev

- name: Install golangci-lint
run: curl -sSfL https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh | sh -s -- -b $(go env GOPATH)/bin v1.45.0
run: curl -sSfL https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh | sh -s -- -b $(go env GOPATH)/bin v1.46.0

- name: Install gofumpt
run: go install mvdan.cc/gofumpt@latest
Expand Down
2 changes: 2 additions & 0 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ linters:
- unused
disable:
- exhaustivestruct
- exhaustruct
- funlen
- gci
- gochecknoglobals
Expand All @@ -30,6 +31,7 @@ linters:
- lll
- maligned
- nlreturn
- nonamedreturns
- paralleltest
- scopelint
- testpackage
Expand Down
2 changes: 2 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,8 @@ bpf_compile_tools = $(CMD_LLC) $(CMD_CLANG)
.PHONY: $(bpf_compile_tools)
$(bpf_compile_tools): % : check_%

# TODO(kakkoyun): To prevent out of sync libbpf dependency, we might want to try directly linking/updating the submodule in the libbpf-go.
# - Determining the location of the go module cache dir and initializing the submodule in there and linking in here, should be doable.
$(LIBBPF_SRC):
test -d $(LIBBPF_SRC) || (echo "missing libbpf source - maybe do 'git submodule init && git submodule update'" ; false)

Expand Down
225 changes: 129 additions & 96 deletions pkg/profiler/profiler.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,11 +66,6 @@ const (

type stack [doubleStackDepth]uint64

type bpfMaps struct {
counts *bpf.BPFMap
stackTraces *bpf.BPFMap
}

// stackCountKey mirrors the struct in parca-agent.bpf.c
// NOTICE: The memory layout and alignment of the struct currently matches the struct in parca-agent.bpf.c.
// However, keep in mind that Go compiler injects padding to align the struct fields to be a multiple of 8 bytes.
Expand All @@ -83,54 +78,22 @@ type stackCountKey struct {
KernelStackID int32
}

func (m bpfMaps) clean() error {
// BPF iterators need the previous value to iterate to the next, so we
// can only delete the "previous" item once we've already iterated to
// the next.

it := m.stackTraces.Iterator()
var prev []byte = nil
for it.Next() {
if prev != nil {
err := m.stackTraces.DeleteKey(unsafe.Pointer(&prev[0]))
if err != nil {
return fmt.Errorf("failed to delete stack trace: %w", err)
}
}
type bpfMaps struct {
counts *bpf.BPFMap
stacks *bpf.BPFMap
}

key := it.Key()
prev = make([]byte, len(key))
copy(prev, key)
}
if prev != nil {
err := m.stackTraces.DeleteKey(unsafe.Pointer(&prev[0]))
func (m bpfMaps) clean(stacks []int32, logger log.Logger) {
for _, stackID := range stacks {
err := m.stacks.DeleteKey(unsafe.Pointer(&stackID))
if err != nil {
return fmt.Errorf("failed to delete stack trace: %w", err)
}
}

it = m.counts.Iterator()
prev = nil
for it.Next() {
if prev != nil {
err := m.counts.DeleteKey(unsafe.Pointer(&prev[0]))
if err != nil {
return fmt.Errorf("failed to delete count: %w", err)
if !errors.Is(err, syscall.ENOENT) {
// Continuing in case of an error as we still want to delete the rest of the
// stacks in the slice.
level.Debug(logger).Log("msg", "failed to delete stack trace", "errno", err)
}
}

key := it.Key()
prev = make([]byte, len(key))
copy(prev, key)
}
if prev != nil {
err := m.counts.DeleteKey(unsafe.Pointer(&prev[0]))
if err != nil {
return fmt.Errorf("failed to delete count: %w", err)
}
}

return nil
}

type metrics struct {
Expand Down Expand Up @@ -199,6 +162,7 @@ type CgroupProfiler struct {

bpfMaps *bpfMaps
byteOrder binary.ByteOrder
countKeys []stackCountKey

lastError error
lastProfileTakenAt time.Time
Expand Down Expand Up @@ -366,11 +330,20 @@ func (p *CgroupProfiler) Run(ctx context.Context) error {
return fmt.Errorf("get counts map: %w", err)
}

stackTraces, err := m.GetMap("stack_traces")
stacks, err := m.GetMap("stack_traces")
if err != nil {
return fmt.Errorf("get stack traces map: %w", err)
}
p.bpfMaps = &bpfMaps{counts: counts, stackTraces: stackTraces}
p.bpfMaps = &bpfMaps{counts: counts, stacks: stacks}

// Allocate this here, so it's only allocated once instead of every
// time that p.profileLoop is called below. This is because, as of now,
// this slice will be around 122Kb. We allocate enough to read the entire
// map instead of using the batch iteration feature because it vastly
// simplifies the code in profileLoop and the batch operations are a bit tricky to get right.
// If allocating this much memory upfront is a problem we can always revisit and use
// smaller batch sizes.
p.countKeys = make([]stackCountKey, counts.GetMaxEntries())

ticker := time.NewTicker(p.profilingDuration)
defer ticker.Stop()
Expand All @@ -393,7 +366,15 @@ func (p *CgroupProfiler) Run(ctx context.Context) error {
}
}

func (p *CgroupProfiler) profileLoop(ctx context.Context, captureTime time.Time) (err error) {
func (p *CgroupProfiler) loopReport(lastProfileTakenAt time.Time, lastError error) {
p.mtx.Lock()
defer p.mtx.Unlock()

p.lastProfileTakenAt = lastProfileTakenAt
p.lastError = lastError
}

func (p *CgroupProfiler) profileLoop(ctx context.Context, captureTime time.Time) error {
var (
mappings = maps.NewMapping(p.pidMappingFileCache)
kernelMapping = &profile.Mapping{
Expand All @@ -405,48 +386,88 @@ func (p *CgroupProfiler) profileLoop(ctx context.Context, captureTime time.Time)
kernelLocations = []*profile.Location{}
userLocations = map[uint32][]*profile.Location{} // PID -> []*profile.Location
locationIndices = map[[2]uint64]int{} // [PID, Address] -> index in locations

// Variables needed for eBPF map batch iteration.
countKeysPtr = unsafe.Pointer(&p.countKeys[0])
// Pointer to the next batch, filled in by the kernel.
nextCountKey = uintptr(0)
)

// TODO(kakkoyun): Use libbpf batch functions.
it := p.bpfMaps.counts.Iterator()
for it.Next() {
// This byte slice is only valid for this iteration, so it must be
// copied if we want to do anything with it outside this loop.
keyBytes := it.Key()

var key stackCountKey
// NOTICE: This works because the key struct in Go and the key struct in C has exactly the same memory layout.
// See the comment in stackCountKey for more details.
if err := binary.Read(bytes.NewBuffer(keyBytes), p.byteOrder, &key); err != nil {
return fmt.Errorf("read stack count key: %w", err)
// Reset count keys before collecting new traces from the kernel.
memsetCountKeys(p.countKeys, stackCountKey{})

var (
values [][]byte
err error
)

batchSize := p.bpfMaps.counts.GetMaxEntries()
level.Debug(p.logger).Log("msg", "fetching stack trace counts in batch", "batchSize", batchSize)

values, err = p.bpfMaps.counts.GetValueAndDeleteBatch(countKeysPtr, nil, unsafe.Pointer(&nextCountKey), batchSize)
processedCount := len(values)

if err != nil {
return fmt.Errorf("get value and delete batch failed with: %w", err)
}

if processedCount == 0 {
level.Error(p.logger).Log("msg", "no values in batch")
return nil
}

// We are getting and deleting the whole map, so there should not be a next batch.
if nextCountKey != uintptr(0) {
level.Debug(p.logger).Log("msg", "Next batch should be null", "nextBatch", nextCountKey)
}

level.Debug(p.logger).Log("msg", "get value and delete batch", "batchSize", batchSize, "processedCount", processedCount)

stacksKeys := make(map[int32]bool, processedCount)

for i, key := range p.countKeys {
var (
pid = key.PID
userStackID = key.UserStackID
kernelStackID = key.KernelStackID
)

if pid == 0 {
continue
}

// Don't go over more stacks than we've fetched.
if i > processedCount {
break
}

// Twice the stack depth because we have a user and a potential Kernel stack.
// Read order matters, since we read from the key buffer.
stack := stack{}
userErr := p.readUserStack(key.UserStackID, &stack)
userErr := p.readUserStack(userStackID, &stack)
if userErr != nil {
if errors.Is(userErr, errUnrecoverable) {
return userErr
}
level.Debug(p.logger).Log("msg", "failed to read user stack", "err", userErr)
}
kernelErr := p.readKernelStack(key.KernelStackID, &stack)
stacksKeys[userStackID] = true

kernelErr := p.readKernelStack(kernelStackID, &stack)
if kernelErr != nil {
if errors.Is(kernelErr, errUnrecoverable) {
return kernelErr
}
level.Debug(p.logger).Log("msg", "failed to read kernel stack", "err", kernelErr)
}
stacksKeys[kernelStackID] = true

if userErr != nil && kernelErr != nil {
// Both stacks are missing. Nothing to do.
continue
}

value, err := p.readValue(keyBytes)
if err != nil {
return fmt.Errorf("read value: %w", err)
}
value := p.byteOrder.Uint64(values[i])
if value == 0 {
// This should never happen, but it's here just in case.
// If we have a zero value, we don't want to add it to the profile.
Expand Down Expand Up @@ -519,10 +540,30 @@ func (p *CgroupProfiler) profileLoop(ctx context.Context, captureTime time.Time)
}
samples[stack] = sample
}
if it.Err() != nil {
return fmt.Errorf("failed iterator: %w", it.Err())

// Delete all stacks.
//
// The capacity will be difficult to estimate without counting as it's
// likely that there will be more than we need due to duplicated stack IDs.
stacksKeySlice := make([]int32, 0, len(stacksKeys))
for key := range stacksKeys {
stacksKeySlice = append(stacksKeySlice, key)
}

// TODO(javierhonduco): Getting -ENOTSUPP, perhaps my kernel doesn't support it?
// Needs more investigation.
//
// processed, err := p.bpfMaps.stacks.DeleteKeyBatch(unsafe.Pointer(&stacksKeySlice[0]), uint32(len(stacksKeySlice)))
// level.Debug(p.logger).Log("msg", "deleted counts map in batch", "deleted", processed)
// if err != nil {
// return fmt.Errorf("failed to delete stacks in batch: %w", err)
//
// }

// Remove the stacktraces one by one. We need to do it at the end as we might
// be deleting entries we need in subsequent iterations.
p.bpfMaps.clean(stacksKeySlice, p.logger)

prof, err := p.buildProfile(ctx, captureTime, samples, locations, kernelLocations, userLocations, mappings, kernelMapping)
if err != nil {
return fmt.Errorf("failed to build profile: %w", err)
Expand All @@ -532,10 +573,6 @@ func (p *CgroupProfiler) profileLoop(ctx context.Context, captureTime time.Time)
level.Error(p.logger).Log("msg", "failed to send profile", "err", err)
}

if err := p.bpfMaps.clean(); err != nil {
level.Warn(p.logger).Log("msg", "failed to clean BPF maps", "err", err)
}

ksymCacheStats := p.ksymCache.Stats
level.Debug(p.logger).Log("msg", "Kernel symbol cache stats", "stats", ksymCacheStats.String())
p.metrics.ksymCacheHitRate.WithLabelValues("hits").Add(float64(ksymCacheStats.Hits))
Expand All @@ -544,14 +581,6 @@ func (p *CgroupProfiler) profileLoop(ctx context.Context, captureTime time.Time)
return nil
}

func (p *CgroupProfiler) loopReport(lastProfileTakenAt time.Time, lastError error) {
p.mtx.Lock()
defer p.mtx.Unlock()

p.lastProfileTakenAt = lastProfileTakenAt
p.lastError = lastError
}

func (p *CgroupProfiler) buildProfile(
ctx context.Context,
captureTime time.Time,
Expand Down Expand Up @@ -690,14 +719,14 @@ func (p *CgroupProfiler) resolveKernelFunctions(kernelLocations []*profile.Locat
return kernelFunctions, nil
}

// readUserStack reads the user stack trace from the stacktraces ebpf map into the given buffer.
// readUserStack reads the user stack trace from the stacktraces ebpf map into the given buffer and deletes it.
func (p *CgroupProfiler) readUserStack(userStackID int32, stack *stack) error {
if userStackID == 0 {
p.metrics.failedStackUnwindingAttempts.WithLabelValues("user").Inc()
return errors.New("user stack ID is 0, probably stack unwinding failed")
}

stackBytes, err := p.bpfMaps.stackTraces.GetValue(unsafe.Pointer(&userStackID))
stackBytes, err := p.bpfMaps.stacks.GetValue(unsafe.Pointer(&userStackID))
if err != nil {
p.metrics.missingStacks.WithLabelValues("user").Inc()
return fmt.Errorf("read user stack trace: %w", err)
Expand All @@ -710,14 +739,14 @@ func (p *CgroupProfiler) readUserStack(userStackID int32, stack *stack) error {
return nil
}

// readKernelStack reads the kernel stack trace from the stacktraces ebpf map into the given buffer.
// readKernelStack reads the kernel stack trace from the stacktraces ebpf map into the given buffer and deletes it.
func (p *CgroupProfiler) readKernelStack(kernelStackID int32, stack *stack) error {
if kernelStackID == 0 {
p.metrics.failedStackUnwindingAttempts.WithLabelValues("kernel").Inc()
return errors.New("kernel stack ID is 0, probably stack unwinding failed")
}

stackBytes, err := p.bpfMaps.stackTraces.GetValue(unsafe.Pointer(&kernelStackID))
stackBytes, err := p.bpfMaps.stacks.GetValue(unsafe.Pointer(&kernelStackID))
if err != nil {
p.metrics.missingStacks.WithLabelValues("kernel").Inc()
return fmt.Errorf("read kernel stack trace: %w", err)
Expand All @@ -730,15 +759,6 @@ func (p *CgroupProfiler) readKernelStack(kernelStackID int32, stack *stack) erro
return nil
}

// readValue reads the value of the given key from the counts ebpf map.
func (p *CgroupProfiler) readValue(keyBytes []byte) (uint64, error) {
valueBytes, err := p.bpfMaps.counts.GetValue(unsafe.Pointer(&keyBytes[0]))
if err != nil {
return 0, fmt.Errorf("get count value: %w", err)
}
return p.byteOrder.Uint64(valueBytes), nil
}

// normalizeProfile calculates the base addresses of a position-independent binary and normalizes captured locations accordingly.
func (p *CgroupProfiler) normalizeAddress(m *profile.Mapping, pid uint32, addr uint64) uint64 {
if m == nil {
Expand Down Expand Up @@ -827,3 +847,16 @@ func (p *CgroupProfiler) bumpMemlockRlimit() error {

return nil
}

// memsetCountKeys will reset the given slice to the given value.
// This function makes use of the highly optimized copy builtin function
// and is able to fill the entire slice in O(log n) time.
func memsetCountKeys(in []stackCountKey, v stackCountKey) {
if len(in) == 0 {
return
}
in[0] = v
for bp := 1; bp < len(in); bp *= 2 {
copy(in[bp:], in[:bp])
}
}

0 comments on commit b29cdb4

Please sign in to comment.