Skip to content

Commit

Permalink
Fix batch operation issues
Browse files Browse the repository at this point in the history
Signed-off-by: Kemal Akkoyun <kakkoyun@gmail.com>
  • Loading branch information
kakkoyun committed Apr 14, 2022
1 parent 8ab7d7a commit d44bf31
Show file tree
Hide file tree
Showing 6 changed files with 77 additions and 34 deletions.
1 change: 1 addition & 0 deletions .dockerignore
Expand Up @@ -6,3 +6,4 @@ Dockerfile
/images
/.github
/tmp
*.bpf.o
2 changes: 2 additions & 0 deletions Makefile
Expand Up @@ -101,6 +101,8 @@ bpf_compile_tools = $(CMD_LLC) $(CMD_CLANG)
.PHONY: $(bpf_compile_tools)
$(bpf_compile_tools): % : check_%

# TODO(kakkoyun): To prevent out of sync libbpf dependency, we nmight want to try directly linking/updating the submodule in the libbpf-go.
# - Determining the location of the go module cache dir and initializing the submodule in there and linking in here, should be doable.
$(LIBBPF_SRC):
test -d $(LIBBPF_SRC) || (echo "missing libbpf source - maybe do 'git submodule init && git submodule update'" ; false)

Expand Down
2 changes: 1 addition & 1 deletion deploy/dev.jsonnet
Expand Up @@ -14,7 +14,7 @@ function(serverVersion='v0.4.2')
version: serverVersion,
replicas: 1,
corsAllowedOrigins: '*',
logLevel: 'debug',
// logLevel: 'debug',
});

local agent = (import 'parca-agent/parca-agent.libsonnet')({
Expand Down
2 changes: 1 addition & 1 deletion deploy/tilt/parca-server-deployment.yaml
Expand Up @@ -27,7 +27,7 @@ spec:
- args:
- /parca
- --config-path=/var/parca/parca.yaml
- --log-level=debug
- --log-level=info
- --cors-allowed-origins=*
image: ghcr.io/parca-dev/parca:v0.10.0
livenessProbe:
Expand Down
2 changes: 1 addition & 1 deletion pkg/agent/write_client.go
Expand Up @@ -105,7 +105,7 @@ func (b *Batcher) batchLoop(ctx context.Context) error {
}

if len(batch) > 0 {
level.Debug(b.logger).Log("msg", "batch write client sent profiles", "count", len(batch))
level.Debug(b.logger).Log("msg", "batch write client has sent profiles", "count", len(batch))
}
return nil
}
Expand Down
102 changes: 71 additions & 31 deletions pkg/profiler/profiler.go
Expand Up @@ -23,6 +23,7 @@ import (
"fmt"
"os"
"runtime"
"runtime/debug"
"strings"
"sync"
"syscall"
Expand Down Expand Up @@ -71,9 +72,8 @@ type stackCountKey struct {
}

type bpfMaps struct {
counts *bpf.BPFMap
stackTraces *bpf.BPFMap
countKeys []stackCountKey
counts *bpf.BPFMap
traces *bpf.BPFMap
}

type CgroupProfiler struct {
Expand All @@ -88,7 +88,8 @@ type CgroupProfiler struct {
ksymCache *ksym.Cache
objCache objectfile.Cache

bpfMaps *bpfMaps
bpfMaps *bpfMaps
countKeys []stackCountKey

missingStacks *prometheus.CounterVec
lastError error
Expand Down Expand Up @@ -146,14 +147,6 @@ func NewCgroupProfiler(
}
}

func (p *CgroupProfiler) loopReport(lastProfileTakenAt time.Time, lastError error) {
p.mtx.Lock()
defer p.mtx.Unlock()

p.lastProfileTakenAt = lastProfileTakenAt
p.lastError = lastError
}

func (p *CgroupProfiler) LastProfileTakenAt() time.Time {
p.mtx.RLock()
defer p.mtx.RUnlock()
Expand Down Expand Up @@ -264,20 +257,20 @@ func (p *CgroupProfiler) Run(ctx context.Context) error {
return fmt.Errorf("get counts map: %w", err)
}

traces, err := m.GetMap("stack_traces")
if err != nil {
return fmt.Errorf("get stack traces map: %w", err)
}
p.bpfMaps = &bpfMaps{counts: counts, traces: traces}

// Allocate this here, so it's only allocated once instead of every
// time that p.profileLoop is called below. This is because, as of now,
// this slice will be around 122Kb. We allocate enough to read the entire
// map instead of using the batch iteration feature because it vastly
// simplifies the code in profileLoop and the batch operations are a bit tricky to get right.
// If allocating this much memory upfront is a problem we can always revisit and use
// smaller batch sizes.
countKeys := make([]stackCountKey, counts.GetMaxEntries())

stackTraces, err := m.GetMap("stack_traces")
if err != nil {
return fmt.Errorf("get stack traces map: %w", err)
}
p.bpfMaps = &bpfMaps{counts: counts, countKeys: countKeys, stackTraces: stackTraces}
p.countKeys = make([]stackCountKey, counts.GetMaxEntries())

ticker := time.NewTicker(p.profilingDuration)
defer ticker.Stop()
Expand All @@ -301,6 +294,13 @@ func (p *CgroupProfiler) Run(ctx context.Context) error {
}

func (p *CgroupProfiler) profileLoop(ctx context.Context, captureTime time.Time) error {
defer func() {
if err := recover(); err != nil {
level.Error(p.logger).Log("msg", "profile loop recovered from panic", "err", err)
debug.PrintStack()
}
}()

var (
prof = &profile.Profile{
SampleType: []*profile.ValueType{{
Expand Down Expand Up @@ -332,23 +332,55 @@ func (p *CgroupProfiler) profileLoop(ctx context.Context, captureTime time.Time)
byteOrder = byteorder.GetHostByteOrder()

// Variables needed for eBPF map batch iteration.
keysPtr = unsafe.Pointer(&p.bpfMaps.countKeys[0])
nextKey = uintptr(1)
countKeysPtr = unsafe.Pointer(&p.countKeys[0])
nextCountKey = uintptr(1)
)

memsetCountKeys(p.bpfMaps.countKeys, stackCountKey{})
// Reset count keys before collecting new traces from the kernel.
memsetCountKeys(p.countKeys, stackCountKey{})

// TODO(kakkoyun): Is there a neat way to find out the number of entries in the map?
batchSize := 0
it := p.bpfMaps.counts.Iterator()
for it.Next() {
batchSize++
}
if err := it.Err(); err != nil {
return fmt.Errorf("iterate over counts map: %w", err)
}

counts := p.bpfMaps.counts
vals, err := counts.GetValueAndDeleteBatch(keysPtr, nil, unsafe.Pointer(&nextKey), counts.GetMaxEntries())
if batchSize == 0 {
return nil
}
level.Debug(p.logger).Log("msg", "fetching stack trace counts in batch", "batchSize", batchSize)
time.Sleep(1 * time.Second)

var (
values [][]byte
err error
)
values, err = p.bpfMaps.counts.GetValueAndDeleteBatch(countKeysPtr, nil, unsafe.Pointer(&nextCountKey), uint32(batchSize))
if err != nil {
// TODO(kakkoyun): Check if errors are correct!
if !errors.Is(err, syscall.ENOENT) { // Map is empty or we got all keys in the last batch.
return err
switch {
case errors.Is(err, syscall.EPERM):
level.Error(p.logger).Log("msg", "get value and delete batch: requested number of items is probably greater than existed", "err", err)
//return fmt.Errorf("get value and delete batch: requested number of items is probably greater than existed: %w", err)
return nil

case errors.Is(err, syscall.ENOENT):
level.Debug(p.logger).Log("msg", "no values in batch")
return nil

default:
return fmt.Errorf("get value and delete batch: %w", err)
}
}
if len(values) == 0 {
level.Debug(p.logger).Log("msg", "no values in batch")
return nil
}

// keys := p.bpfMaps.counts
for i, key := range p.bpfMaps.countKeys {
for i, key := range p.countKeys {
var (
pid = key.pid
userStackID = key.userStackID
Expand All @@ -359,9 +391,9 @@ func (p *CgroupProfiler) profileLoop(ctx context.Context, captureTime time.Time)
break
}

value := byteOrder.Uint64(vals[i])
value := byteOrder.Uint64(values[i])

stackTraces := p.bpfMaps.stackTraces
stackTraces := p.bpfMaps.traces
stackBytes, err := stackTraces.GetValue(unsafe.Pointer(&userStackID))
if err != nil {
p.missingStacks.WithLabelValues("user").Inc()
Expand Down Expand Up @@ -548,6 +580,14 @@ func (p *CgroupProfiler) profileLoop(ctx context.Context, captureTime time.Time)
return nil
}

func (p *CgroupProfiler) loopReport(lastProfileTakenAt time.Time, lastError error) {
p.mtx.Lock()
defer p.mtx.Unlock()

p.lastProfileTakenAt = lastProfileTakenAt
p.lastError = lastError
}

func (p *CgroupProfiler) normalizeAddress(m *profile.Mapping, pid uint32, addr uint64) uint64 {
if m == nil {
return addr
Expand Down

0 comments on commit d44bf31

Please sign in to comment.