Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We鈥檒l occasionally send you account related emails.

Already on GitHub? Sign in to your account

profiler: Batch API fixes #395

Merged
merged 1 commit into from May 5, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Expand Up @@ -106,7 +106,7 @@ bpf_compile_tools = $(CMD_LLC) $(CMD_CLANG)
.PHONY: $(bpf_compile_tools)
$(bpf_compile_tools): % : check_%

# TODO(kakkoyun): To prevent out of sync libbpf dependency, we nmight want to try directly linking/updating the submodule in the libbpf-go.
# TODO(kakkoyun): To prevent out of sync libbpf dependency, we might want to try directly linking/updating the submodule in the libbpf-go.
# - Determining the location of the go module cache dir and initializing the submodule in there and linking in here, should be doable.
$(LIBBPF_SRC):
test -d $(LIBBPF_SRC) || (echo "missing libbpf source - maybe do 'git submodule init && git submodule update'" ; false)
Expand Down
128 changes: 78 additions & 50 deletions pkg/profiler/profiler.go
Expand Up @@ -80,7 +80,20 @@ type stackCountKey struct {

type bpfMaps struct {
counts *bpf.BPFMap
traces *bpf.BPFMap
stacks *bpf.BPFMap
}

func (m bpfMaps) clean(stacks []int32, logger log.Logger) {
for _, stackId := range stacks {
javierhonduco marked this conversation as resolved.
Show resolved Hide resolved
err := m.stacks.DeleteKey(unsafe.Pointer(&stackId))
if err != nil {
if !errors.Is(err, syscall.ENOENT) {
// Continuing in case of an error as we still want to delete the rest of the
// stacks in the slice.
level.Debug(logger).Log("msg", "failed to delete stack trace", "errno", err)
javierhonduco marked this conversation as resolved.
Show resolved Hide resolved
}
}
}
}

type metrics struct {
Expand Down Expand Up @@ -317,11 +330,11 @@ func (p *CgroupProfiler) Run(ctx context.Context) error {
return fmt.Errorf("get counts map: %w", err)
}

traces, err := m.GetMap("stack_traces")
stacks, err := m.GetMap("stack_traces")
if err != nil {
return fmt.Errorf("get stack traces map: %w", err)
}
p.bpfMaps = &bpfMaps{counts: counts, traces: traces}
p.bpfMaps = &bpfMaps{counts: counts, stacks: stacks}

// Allocate this here, so it's only allocated once instead of every
// time that p.profileLoop is called below. This is because, as of now,
Expand Down Expand Up @@ -376,52 +389,42 @@ func (p *CgroupProfiler) profileLoop(ctx context.Context, captureTime time.Time)

// Variables needed for eBPF map batch iteration.
countKeysPtr = unsafe.Pointer(&p.countKeys[0])
nextCountKey = uintptr(1)
// Pointer to the next batch, filled in by the kernel.
nextCountKey = uintptr(0)
javierhonduco marked this conversation as resolved.
Show resolved Hide resolved
)

// Reset count keys before collecting new traces from the kernel.
memsetCountKeys(p.countKeys, stackCountKey{})

batchSize := 0
it := p.bpfMaps.counts.Iterator()
for it.Next() {
batchSize++
}
if err := it.Err(); err != nil {
return fmt.Errorf("iterate over counts map: %w", err)
}

if batchSize == 0 {
return nil
}
level.Debug(p.logger).Log("msg", "fetching stack trace counts in batch", "batchSize", batchSize)
time.Sleep(1 * time.Second)

var (
values [][]byte
err error
)
values, err = p.bpfMaps.counts.GetValueAndDeleteBatch(countKeysPtr, nil, unsafe.Pointer(&nextCountKey), uint32(batchSize))

batchSize := p.bpfMaps.counts.GetMaxEntries()
level.Debug(p.logger).Log("msg", "fetching stack trace counts in batch", "batchSize", batchSize)

values, err = p.bpfMaps.counts.GetValueAndDeleteBatch(countKeysPtr, nil, unsafe.Pointer(&nextCountKey), batchSize)
processedCount := len(values)

if err != nil {
switch {
case errors.Is(err, syscall.EPERM):
level.Error(p.logger).Log("msg", "get value and delete batch: requested number of items is probably greater than existed", "err", err)
// return fmt.Errorf("get value and delete batch: requested number of items is probably greater than existed: %w", err)
return nil

case errors.Is(err, syscall.ENOENT):
level.Debug(p.logger).Log("msg", "no values in batch")
return nil

default:
return fmt.Errorf("get value and delete batch: %w", err)
}
return fmt.Errorf("get value and delete batch failed with: %w", err)
}
if len(values) == 0 {
level.Debug(p.logger).Log("msg", "no values in batch")

if processedCount == 0 {
level.Error(p.logger).Log("msg", "no values in batch")
return nil
}

// We are getting and deleting the whole map, so there should not be a next batch.
if nextCountKey != uintptr(0) {
level.Debug(p.logger).Log("msg", "Next batch should be null", "nextBatch", nextCountKey)
}

level.Debug(p.logger).Log("msg", "get value and delete batch", "batchSize", batchSize, "processedCount", processedCount)

stacksKeys := make(map[int32]bool, processedCount)

for i, key := range p.countKeys {
var (
pid = key.PID
Expand All @@ -433,23 +436,32 @@ func (p *CgroupProfiler) profileLoop(ctx context.Context, captureTime time.Time)
continue
}

// Don't go over more stacks than we've fetched.
if i > processedCount {
break
}

// Twice the stack depth because we have a user and a potential Kernel stack.
// Read order matters, since we read from the key buffer.
stack := stack{}
userErr := p.getAndDeleteUserStack(userStackID, &stack)
userErr := p.readUserStack(userStackID, &stack)
if userErr != nil {
if errors.Is(userErr, errUnrecoverable) {
return userErr
}
level.Debug(p.logger).Log("msg", "failed to read user stack", "err", userErr)
}
kernelErr := p.getAndDeleteKernelStack(kernelStackID, &stack)
stacksKeys[userStackID] = true

kernelErr := p.readKernelStack(kernelStackID, &stack)
if kernelErr != nil {
if errors.Is(kernelErr, errUnrecoverable) {
return kernelErr
}
level.Debug(p.logger).Log("msg", "failed to read kernel stack", "err", kernelErr)
}
stacksKeys[kernelStackID] = true

if userErr != nil && kernelErr != nil {
// Both stacks are missing. Nothing to do.
continue
Expand Down Expand Up @@ -529,6 +541,29 @@ func (p *CgroupProfiler) profileLoop(ctx context.Context, captureTime time.Time)
samples[stack] = sample
}

// Delete all stacks.
//
// The capacity will be difficult to estimate without counting as it's
// likely that there will be more than we need due to duplicated stack IDs.
stacksKeySlice := make([]int32, 0, len(stacksKeys))
for key, _ := range stacksKeys {
stacksKeySlice = append(stacksKeySlice, key)
}

// TODO(javierhonduco): Getting -ENOTSUPP, perhaps my kernel doesn't support it?
// Needs more investigation.
//
// processed, err := p.bpfMaps.stacks.DeleteKeyBatch(unsafe.Pointer(&stacksKeySlice[0]), uint32(len(stacksKeySlice)))
// level.Debug(p.logger).Log("msg", "deleted counts map in batch", "deleted", processed)
// if err != nil {
// return fmt.Errorf("failed to delete stacks in batch: %w", err)
//
// }

// Remove the stacktraces one by one. We need to do it at the end as we might
// be deleting entries we need in subsequent iterations.
p.bpfMaps.clean(stacksKeySlice, p.logger)

prof, err := p.buildProfile(ctx, captureTime, samples, locations, kernelLocations, userLocations, mappings, kernelMapping)
if err != nil {
return fmt.Errorf("failed to build profile: %w", err)
Expand Down Expand Up @@ -684,14 +719,14 @@ func (p *CgroupProfiler) resolveKernelFunctions(kernelLocations []*profile.Locat
return kernelFunctions, nil
}

// getAndDeleteUserStack reads the user stack trace from the stacktraces ebpf map into the given buffer and deletes it.
func (p *CgroupProfiler) getAndDeleteUserStack(userStackID int32, stack *stack) error {
// readUserStack reads the user stack trace from the stacktraces ebpf map into the given buffer and deletes it.
func (p *CgroupProfiler) readUserStack(userStackID int32, stack *stack) error {
if userStackID == 0 {
p.metrics.failedStackUnwindingAttempts.WithLabelValues("user").Inc()
return errors.New("user stack ID is 0, probably stack unwinding failed")
}

stackBytes, err := p.bpfMaps.traces.GetValue(unsafe.Pointer(&userStackID))
stackBytes, err := p.bpfMaps.stacks.GetValue(unsafe.Pointer(&userStackID))
if err != nil {
p.metrics.missingStacks.WithLabelValues("user").Inc()
return fmt.Errorf("read user stack trace: %w", err)
Expand All @@ -701,21 +736,17 @@ func (p *CgroupProfiler) getAndDeleteUserStack(userStackID int32, stack *stack)
return fmt.Errorf("read user stack bytes, %s: %w", err, errUnrecoverable)
}

if err := p.bpfMaps.traces.DeleteKey(unsafe.Pointer(&userStackID)); err != nil {
return fmt.Errorf("unable to delete stack trace key: %w", err)
}

return nil
}

// getAndDeleteKernelStack reads the kernel stack trace from the stacktraces ebpf map into the given buffer and deletes it.
func (p *CgroupProfiler) getAndDeleteKernelStack(kernelStackID int32, stack *stack) error {
// readKernelStack reads the kernel stack trace from the stacktraces ebpf map into the given buffer and deletes it.
func (p *CgroupProfiler) readKernelStack(kernelStackID int32, stack *stack) error {
if kernelStackID == 0 {
p.metrics.failedStackUnwindingAttempts.WithLabelValues("kernel").Inc()
return errors.New("kernel stack ID is 0, probably stack unwinding failed")
}

stackBytes, err := p.bpfMaps.traces.GetValue(unsafe.Pointer(&kernelStackID))
stackBytes, err := p.bpfMaps.stacks.GetValue(unsafe.Pointer(&kernelStackID))
if err != nil {
p.metrics.missingStacks.WithLabelValues("kernel").Inc()
return fmt.Errorf("read kernel stack trace: %w", err)
Expand All @@ -725,9 +756,6 @@ func (p *CgroupProfiler) getAndDeleteKernelStack(kernelStackID int32, stack *sta
return fmt.Errorf("read kernel stack bytes, %s: %w", err, errUnrecoverable)
}

if err := p.bpfMaps.traces.DeleteKey(unsafe.Pointer(&kernelStackID)); err != nil {
return fmt.Errorf("unable to delete stack trace key: %w", err)
}
return nil
}

Expand Down